Qore Programming Language  1.12.1
qore_socket_private.h
1 /* -*- mode: c++; indent-tabs-mode: nil -*- */
2 /*
3  qore_socket_private.h
4 
5  Qore Programming Language
6 
7  Copyright (C) 2003 - 2022 Qore Technologies, s.r.o.
8 
9  Permission is hereby granted, free of charge, to any person obtaining a
10  copy of this software and associated documentation files (the "Software"),
11  to deal in the Software without restriction, including without limitation
12  the rights to use, copy, modify, merge, publish, distribute, sublicense,
13  and/or sell copies of the Software, and to permit persons to whom the
14  Software is furnished to do so, subject to the following conditions:
15 
16  The above copyright notice and this permission notice shall be included in
17  all copies or substantial portions of the Software.
18 
19  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
20  IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
21  FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
22  AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
23  LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
24  FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
25  DEALINGS IN THE SOFTWARE.
26 
27  Note that the Qore library is released under a choice of three open-source
28  licenses: MIT (as above), LGPL 2+, or GPL 2+; see README-LICENSE for more
29  information.
30 */
31 
32 #ifndef _QORE_QORE_SOCKET_PRIVATE_H
33 #define _QORE_QORE_SOCKET_PRIVATE_H
34 
35 #include "qore/AbstractPollState.h"
36 #include "qore/QoreSocket.h"
37 #include "qore/InputStream.h"
38 #include "qore/OutputStream.h"
39 
40 #include "qore/intern/SSLSocketHelper.h"
41 #include "qore/intern/QC_Queue.h"
42 
43 #include <cctype>
44 #include <cctype>
45 #include <cerrno>
46 #include <cstdlib>
47 #include <cstring>
48 #include <strings.h>
49 
50 #include <openssl/ssl.h>
51 #include <openssl/err.h>
52 
53 #if defined HAVE_POLL
54 #include <poll.h>
55 #elif defined HAVE_SYS_SELECT_H
56 #include <sys/select.h>
57 #elif (defined _WIN32 || defined __WIN32__) && ! defined __CYGWIN__
58 #define HAVE_SELECT 1
59 #else
60 #error no async socket I/O APIs available
61 #endif
62 
63 #ifndef DEFAULT_SOCKET_BUFSIZE
64 #define DEFAULT_SOCKET_BUFSIZE (64 * 1024)
65 #endif
66 
67 #ifndef QORE_MAX_HEADER_SIZE
68 #define QORE_MAX_HEADER_SIZE 16384
69 #endif
70 
71 #define CHF_HTTP11 (1 << 0)
72 #define CHF_PROCESS (1 << 1)
73 #define CHF_REQUEST (1 << 2)
74 
75 #ifndef DEFAULT_SOCKET_MIN_THRESHOLD_BYTES
76 #define DEFAULT_SOCKET_MIN_THRESHOLD_BYTES 1024
77 #endif
78 
79 static constexpr int SOCK_POLLIN = (1 << 0);
80 static constexpr int SOCK_POLLOUT = (1 << 1);
81 static constexpr int SOCK_POLLERR = (1 << 2);
82 
83 DLLLOCAL void concat_target(QoreString& str, const struct sockaddr *addr, const char* type = "target");
84 DLLLOCAL int do_read_error(qore_offset_t rc, const char* method_name, int timeout_ms, ExceptionSink* xsink);
85 DLLLOCAL int sock_get_raw_error();
86 DLLLOCAL int sock_get_error();
87 DLLLOCAL void qore_socket_error(ExceptionSink* xsink, const char* err, const char* cdesc, const char* mname = nullptr,
88  const char* host = nullptr, const char* svc = nullptr, const struct sockaddr *addr = nullptr);
89 DLLLOCAL void qore_socket_error_intern(int rc, ExceptionSink* xsink, const char* err, const char* cdesc,
90  const char* mname = nullptr, const char* host = nullptr, const char* svc = nullptr,
91  const struct sockaddr* addr = nullptr);
92 DLLLOCAL void se_in_op(const char* cname, const char* meth, ExceptionSink* xsink);
93 DLLLOCAL void se_in_op_thread(const char* cname, const char* meth, ExceptionSink* xsink);
94 DLLLOCAL void se_not_open(const char* cname, const char* meth, ExceptionSink* xsink, const char* extra = nullptr);
95 DLLLOCAL void se_timeout(const char* cname, const char* meth, int timeout_ms, ExceptionSink* xsink);
96 DLLLOCAL void se_closed(const char* cname, const char* mname, ExceptionSink* xsink);
97 
98 #ifdef _Q_WINDOWS
99 #define GETSOCKOPT_ARG_4 char*
100 #define SETSOCKOPT_ARG_4 const char*
101 #define SHUTDOWN_ARG SD_BOTH
102 #define QORE_INVALID_SOCKET ((int)INVALID_SOCKET)
103 #define QORE_SOCKET_ERROR SOCKET_ERROR
104 DLLLOCAL int check_windows_rc(int rc);
105 
106 #ifndef ECONNRESET
107 #define ECONNRESET WSAECONNRESET
108 #endif
109 
110 #else
111 // UNIX/Cygwin
112 #define GETSOCKOPT_ARG_4 void*
113 #define SETSOCKOPT_ARG_4 void*
114 #define SHUTDOWN_ARG SHUT_RDWR
115 #define QORE_INVALID_SOCKET -1
116 #define QORE_SOCKET_ERROR -1
117 #endif
118 
119 template <typename T>
120 class PrivateDataListHolder {
121 public:
122  DLLLOCAL PrivateDataListHolder(ExceptionSink* xsink) : xsink(xsink) {
123  }
124 
125  DLLLOCAL ~PrivateDataListHolder() {
126  for (auto& i : pd_vec)
127  i->deref(xsink);
128  }
129 
130  DLLLOCAL T* add(const QoreObject* o, qore_classid_t cid) {
131  T* pd = static_cast<T*>(o->getReferencedPrivateData(cid, xsink));
132  if (!pd)
133  return nullptr;
134  pd_vec.push_back(pd);
135  return pd;
136  }
137 
138 private:
139  typedef std::vector<T*> pd_vec_t;
140  pd_vec_t pd_vec;
141  ExceptionSink* xsink;
142 };
143 
144 hashdecl qore_socketsource_private {
145  QoreStringNode* address;
146  QoreStringNode* hostname;
147 
148  DLLLOCAL qore_socketsource_private() : address(0), hostname(0) {
149  }
150 
151  DLLLOCAL ~qore_socketsource_private() {
152  if (address) address->deref();
153  if (hostname) hostname->deref();
154  }
155 
156  DLLLOCAL void setAddress(QoreStringNode* addr) {
157  assert(!address);
158  address = addr;
159  }
160 
161  DLLLOCAL void setAddress(const char* addr) {
162  assert(!address);
163  address = new QoreStringNode(addr);
164  }
165 
166  DLLLOCAL void setHostName(const char* host) {
167  assert(!hostname);
168  hostname = new QoreStringNode(host);
169  }
170 
171  DLLLOCAL void setAll(QoreObject* o, ExceptionSink* xsink) {
172  if (address) {
173  o->setValue("source", address, xsink);
174  address = 0;
175  }
176 
177  if (hostname) {
178  o->setValue("source_host", hostname, xsink);
179  hostname = 0;
180  }
181  }
182 };
183 
184 class OptionalNonBlockingHelper {
185 public:
186  qore_socket_private& sock;
187  ExceptionSink* xsink;
188  bool set;
189 
190  DLLLOCAL OptionalNonBlockingHelper(qore_socket_private& s, bool n_set, ExceptionSink* xs);
191  DLLLOCAL ~OptionalNonBlockingHelper();
192 };
193 
194 class PrivateQoreSocketTimeoutBase {
195 protected:
196  hashdecl qore_socket_private* sock;
197  int64 start;
198 
199 public:
200  DLLLOCAL PrivateQoreSocketTimeoutBase(qore_socket_private* s) : sock(s), start(sock ? q_clock_getmicros() : 0) {
201  }
202 };
203 
204 class PrivateQoreSocketTimeoutHelper : public PrivateQoreSocketTimeoutBase {
205 protected:
206  const char* op;
207 public:
208  DLLLOCAL PrivateQoreSocketTimeoutHelper(qore_socket_private* s, const char* op);
209  DLLLOCAL ~PrivateQoreSocketTimeoutHelper();
210 };
211 
212 class PrivateQoreSocketThroughputHelper : public PrivateQoreSocketTimeoutBase {
213 protected:
214  bool send;
215 public:
216  DLLLOCAL PrivateQoreSocketThroughputHelper(qore_socket_private* s, bool snd);
217  DLLLOCAL ~PrivateQoreSocketThroughputHelper();
218 
219  DLLLOCAL void finalize(int64 bytes);
220 };
221 
222 hashdecl qore_socket_private;
223 
224 hashdecl qore_socket_op_helper {
225 protected:
226  qore_socket_private* s;
227 
228 public:
229  DLLLOCAL qore_socket_op_helper(qore_socket_private* sock);
230  DLLLOCAL ~qore_socket_op_helper();
231 };
232 
233 class SSLSocketHelperHelper {
234 protected:
235  qore_socket_private* s;
236  SSLSocketHelper* ssl;
237  bool context_saved = false;
238 
239 public:
240  DLLLOCAL SSLSocketHelperHelper(qore_socket_private* sock, bool set_thread_context = false);
241 
242  DLLLOCAL ~SSLSocketHelperHelper();
243 
244  DLLLOCAL void error();
245 };
246 
247 constexpr int SCIPS_CONNECT = 0;
248 constexpr int SCIPS_CHECK_CONNECT = 1;
249 
250 class SocketConnectInetPollState : public AbstractPollState {
251 public:
252  DLLLOCAL SocketConnectInetPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* host,
253  const char* service, int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0);
254 
261  DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
262 
263 private:
264  QoreAddrInfo ai;
265  qore_socket_private* sock;
266  std::string host, service;
267  hashdecl addrinfo* p = nullptr;
268  int prt = -1;
269  int state = SCIPS_CONNECT;
270 
271  DLLLOCAL int doConnect(ExceptionSink* xsink);
272 
273  // returns 0 = connected, 1 = try again, -1 = error
274  DLLLOCAL int checkConnection(ExceptionSink* xsink);
275 
277  DLLLOCAL int next(ExceptionSink* xsink);
278 
280  DLLLOCAL int nextIntern(ExceptionSink* xsink);
281 };
282 
283 #ifndef _Q_WINDOWS
284 class SocketConnectUnixPollState : public AbstractPollState {
285 public:
286  DLLLOCAL SocketConnectUnixPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* name,
287  int type = SOCK_STREAM, int protocol = 0);
288 
295  DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
296 
297 private:
298  qore_socket_private* sock;
299  std::string name;
300  hashdecl sockaddr_un addr;
301  int state = SCIPS_CONNECT;
302 
303  DLLLOCAL int doConnect(ExceptionSink* xsink);
304 
305  // returns 0 = connected, 1 = try again, -1 = error
306  DLLLOCAL int checkConnection(ExceptionSink* xsink);
307 };
308 #endif
309 
310 class SocketConnectSslPollState : public AbstractPollState {
311 public:
312  DLLLOCAL SocketConnectSslPollState(ExceptionSink* xsink, qore_socket_private* sock, X509* cert, EVP_PKEY* pkey);
313 
320  DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
321 
322 private:
323  qore_socket_private* sock;
324 
325  // returns 0 = connected, 1 = try again, -1 = error
326  DLLLOCAL int checkConnection(ExceptionSink* xsink);
327 };
328 
329 #if 0
330 class SocketAcceptPollState : public AbstractPollState {
331 public:
332  DLLLOCAL SocketAcceptPollState(ExceptionSink* xsink, qore_socket_private* sock);
333 private:
334  qore_socket_private* sock;
335 };
336 
337 class SocketAcceptSslPollState : public AbstractPollState {
338  DLLLOCAL SocketAcceptSslPollState(ExceptionSink* xsink, qore_socket_private* sock, X509* cert, EVP_PKEY* pkey)
339  : sock(sock) {
340  assert(!sock->ssl);
341  SSLSocketHelperHelper sshh(sock, true);
342 
343  sock->do_start_ssl_event();
344  int rc;
345  if (rc = sock->ssl->setServer("acceptSSL", sock->sock, cert, pkey, xsink)) {
346  sshh.error();
347  assert(*xsink);
348  return;
349  }
350 
351  ssl->startAccept(xsink);
352  }
353 };
354 #endif
355 
356 class SocketSendPollState : public AbstractPollState {
357 public:
358  DLLLOCAL SocketSendPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* data, size_t size);
359 
366  DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
367 
368 private:
369  qore_socket_private* sock;
370  const char* data;
371  size_t size;
372  size_t sent = 0;
373 };
374 
375 class SocketRecvPollState : public AbstractPollState {
376 public:
377  DLLLOCAL SocketRecvPollState(ExceptionSink* xsink, qore_socket_private* sock, size_t size);
378 
385  DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
386 
388  DLLLOCAL virtual QoreValue takeOutput() {
389  QoreValue rv = bin.release();
390  bin = nullptr;
391  return rv;
392  }
393 
394 private:
395  qore_socket_private* sock;
397  size_t size;
398  size_t received = 0;
399 };
400 
401 class SocketRecvUntilBytesPollState : public AbstractPollState {
402 public:
403  DLLLOCAL SocketRecvUntilBytesPollState(ExceptionSink* xsink, qore_socket_private* sock, const char* bytes,
404  size_t size);
405 
412  DLLLOCAL virtual int continuePoll(ExceptionSink* xsink);
413 
415  DLLLOCAL virtual QoreValue takeOutput() {
416  size_t len = bin->size();
417  BinaryNode* rv = new BinaryNode(bin->giveBuffer(), len);
418  bin = nullptr;
419  return rv;
420  }
421 
422 private:
423  qore_socket_private* sock;
424  // we are using QoreStringNode as it has a much better append / concat implementation than BinaryNode
426  const char* bytes;
427  size_t size;
428  size_t matched = 0;
429 
430  DLLLOCAL int doRecv(ExceptionSink* xsink);
431 };
432 
433 
434 hashdecl qore_socket_private {
435  friend class PrivateQoreSocketTimeoutHelper;
436  friend class PrivateQoreSocketThroughputHelper;
437  friend class SocketConnectInetPollState;
438 
439  // for client certificate capture
440  static thread_local qore_socket_private* current_socket;
441 
442  int sock, sfamily, port, stype, sprot;
443 
444  // issue #3558: connection sequence to show when a connection has been reestablished
445  int64 connection_id = 0;
446 
447  const QoreEncoding* enc;
448 
449  std::string socketname;
450  // issue #3053: client target for SNI
451  std::string client_target;
452  SSLSocketHelper* ssl = nullptr;
453  Queue* event_queue = nullptr,
454  * warn_queue = nullptr;
455 
456  // issue #3633: HTTP encoding to assume
457  std::string assume_http_encoding = "ISO-8859-1";
458 
459  // socket buffer for buffered reads
460  char rbuf[DEFAULT_SOCKET_BUFSIZE];
461 
462  // current buffer size
463  size_t buflen = 0,
464  bufoffset = 0;
465 
466  int64 tl_warning_us = 0; // timeout threshold for network action warning in microseconds
467  double tp_warning_bs = 0; // throughput warning threshold in B/s
468  int64 tp_bytes_sent = 0, // throughput: bytes sent
469  tp_bytes_recv = 0, // throughput: bytes received
470  tp_us_sent = 0, // throughput: time sending
471  tp_us_recv = 0, // throughput: time receiving
472  tp_us_min = 0 // throughput: minimum time for transfer to be considered
473  ;
474 
476  QoreValue warn_callback_arg;
478  QoreValue event_arg;
479  bool del = false,
480  http_exp_chunked_body = false,
481  ssl_accept_all_certs = false,
482  ssl_capture_remote_cert = false,
483  event_data = false;
484  int in_op = -1,
485  ssl_verify_mode = SSL_VERIFY_NONE;
486 
487  // issue #3512: the remote certificate captured
488  QoreObject* remote_cert = nullptr;
489 
490  // issue #3818: verbose certificate verification error info
491  QoreStringNode* ssl_err_str = nullptr;
492 
493  DLLLOCAL qore_socket_private(int n_sock = QORE_INVALID_SOCKET, int n_sfamily = AF_UNSPEC,
494  int n_stype = SOCK_STREAM, int n_prot = 0, const QoreEncoding* n_enc = QCS_DEFAULT) :
495  sock(n_sock), sfamily(n_sfamily), port(-1), stype(n_stype), sprot(n_prot), enc(n_enc) {
496  }
497 
498  DLLLOCAL ~qore_socket_private() {
499  close_internal();
500 
501  // must be dereferenced and removed before deleting
502  assert(!event_queue);
503  assert(!warn_queue);
504  }
505 
506  DLLLOCAL bool isOpen() {
507  return sock != QORE_INVALID_SOCKET;
508  }
509 
510  DLLLOCAL int close() {
511  int rc = close_internal();
512  if (in_op >= 0)
513  in_op = -1;
514  if (http_exp_chunked_body)
515  http_exp_chunked_body = false;
516  sfamily = AF_UNSPEC;
517  stype = SOCK_STREAM;
518  sprot = 0;
519 
520  return rc;
521  }
522 
523  DLLLOCAL int close_and_reset() {
524  assert(sock != QORE_INVALID_SOCKET);
525  int rc;
526  while (true) {
527 #ifdef _Q_WINDOWS
528  rc = ::closesocket(sock);
529 #else
530  rc = ::close(sock);
531 #endif
532  // try again if close was interrupted by a signal
533  if (!rc || sock_get_error() != EINTR)
534  break;
535  }
536  //printd(5, "qore_socket_private::close_and_reset(this: %p) close(%d) returned %d\n", this, sock, rc);
537  sock = QORE_INVALID_SOCKET;
538  if (buflen)
539  buflen = 0;
540  if (bufoffset)
541  bufoffset = 0;
542  if (del)
543  del = false;
544  if (port != -1)
545  port = -1;
546  // issue #3053: clear hostname for SNI
547  client_target.clear();
548  return rc;
549  }
550 
551  DLLLOCAL int close_internal() {
552  //printd(5, "qore_socket_private::close_internal(this: %p) sock: %d\n", this, sock);
553  if (ssl_err_str) {
554  ssl_err_str->deref();
555  ssl_err_str = nullptr;
556  }
557  if (remote_cert) {
558  remote_cert->deref(nullptr);
559  remote_cert = nullptr;
560  }
561  if (sock >= 0) {
562  // if an SSL connection has been established, shut it down first
563  if (ssl) {
564  ssl->shutdown();
565  ssl->deref();
566  ssl = nullptr;
567  }
568 
569  if (!socketname.empty()) {
570  if (del)
571  unlink(socketname.c_str());
572  socketname.clear();
573  }
574  do_close_event();
575  // issue #3558: increment the connection sequence here. so the connection sequence is different as soon as
576  // it's closed
577  ++connection_id;
578 
579  return close_and_reset();
580  } else {
581  return 0;
582  }
583  }
584 
585  DLLLOCAL void setAssumedEncoding(const char* str) {
586  assume_http_encoding = str;
587  }
588 
589  DLLLOCAL const char* getAssumedEncoding() const {
590  return assume_http_encoding.c_str();
591  }
592 
593  DLLLOCAL int getSendTimeout() const {
594  hashdecl timeval tv;
595 
596 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
597  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
598  // but the library expects a 32-bit value
599  int size = sizeof(hashdecl timeval);
600 #else
601  socklen_t size = sizeof(hashdecl timeval);
602 #endif
603 
604  if (getsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
605  return -1;
606 
607  return tv.tv_sec * 1000 + tv.tv_usec / 1000;
608  }
609 
610  DLLLOCAL int getRecvTimeout() const {
611  hashdecl timeval tv;
612 
613 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
614  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
615  // but the library expects a 32-bit value
616  int size = sizeof(hashdecl timeval);
617 #else
618  socklen_t size = sizeof(hashdecl timeval);
619 #endif
620 
621  if (getsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, (GETSOCKOPT_ARG_4)&tv, (socklen_t *)&size))
622  return -1;
623 
624  return tv.tv_sec * 1000 + tv.tv_usec / 1000;
625  }
626 
627  DLLLOCAL int getPort() {
628  // if we don't need to find out what port we are, then return current value
629  if (sock == QORE_INVALID_SOCKET || (sfamily != AF_INET && sfamily != AF_INET6) || port > 0)
630  return port;
631 
632  // otherwise find out what port we're connected to
633  hashdecl sockaddr_storage addr;
634 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
635  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
636  int size = sizeof addr;
637 #else
638  socklen_t size = sizeof addr;
639 #endif
640 
641  if (getsockname(sock, (struct sockaddr *)&addr, (socklen_t *)&size) < 0)
642  return -1;
643 
644  port = q_get_port_from_addr((const struct sockaddr *)&addr);
645  return port;
646  }
647 
648  DLLLOCAL static void do_header(const char* key, QoreString& hdr, const QoreValue& v) {
649  switch (v.getType()) {
650  case NT_STRING:
651  hdr.sprintf("%s: %s\r\n", key, v.get<const QoreStringNode>()->c_str());
652  break;
653  case NT_INT:
654  hdr.sprintf("%s: " QLLD "\r\n", key, v.getAsBigInt());
655  break;
656  case NT_FLOAT: {
657  hdr.sprintf("%s: ", key);
658  size_t offset = hdr.size();
659  hdr.sprintf("%f\r\n", v.getAsFloat());
660  // issue 1556: external modules that call setlocale() can change
661  // the decimal point character used here from '.' to ','
662  // only search the double added, QoreString::sprintf() concatenates
663  q_fix_decimal(&hdr, offset);
664  break;
665  }
666  case NT_NUMBER:
667  hdr.sprintf("%s: ", key);
668  v.get<const QoreNumberNode>()->toString(hdr);
669  hdr.concat("\r\n");
670  break;
671  case NT_BOOLEAN:
672  hdr.sprintf("%s: %d\r\n", key, (int)v.getAsBool());
673  break;
674  }
675  }
676 
677  // issue #3879: must add Content-Length if not present, even if there is no message body
680  DLLLOCAL static void do_headers(QoreString& hdr, const QoreHashNode* headers, size_t size, bool addsize = true) {
681  // RFC-2616 4.4 (http://tools.ietf.org/html/rfc2616#section-4.4)
682  // add Content-Length: 0 to headers for responses without a body where there is no transfer-encoding
683  if (headers) {
684  ConstHashIterator hi(headers);
685 
686  while (hi.next()) {
687  const QoreValue v = hi.get();
688  const char* key = hi.getKey();
689  if (addsize && !strcasecmp(key, "transfer-encoding"))
690  addsize = false;
691  if ((addsize || size) && !strcasecmp(key, "content-length")) {
692  // ignore Content-Length given manually
693  continue;
694  }
695  if (v.getType() == NT_LIST) {
696  ConstListIterator li(v.get<const QoreListNode>());
697  while (li.next())
698  do_header(key, hdr, li.getValue());
699  } else
700  do_header(key, hdr, v);
701  }
702  }
703  // add data and content-length header if necessary
704  if (size || addsize) {
705  hdr.sprintf("Content-Length: " QLLD "\r\n", size);
706  //printd(5, "qore_socket_private::do_headers() added Content-Length: " QLLD "\n", size);
707  }
708 
709  hdr.concat("\r\n");
710  }
711 
712  DLLLOCAL int listen(int backlog = 20) {
713  if (sock == QORE_INVALID_SOCKET)
714  return QSE_NOT_OPEN;
715  if (in_op >= 0)
716  return QSE_IN_OP;
717 #ifdef _Q_WINDOWS
718  if (::listen(sock, backlog)) {
719  // set errno
720  sock_get_error();
721  return -1;
722  }
723  return 0;
724 #else
725  return ::listen(sock, backlog);
726 #endif
727  }
728 
729  DLLLOCAL int accept_intern(ExceptionSink* xsink, struct sockaddr *addr, socklen_t *size, int timeout_ms = -1) {
730  //printd(5, "qore_socket_private::accept_intern() to: %d\n", timeout_ms);
731  assert(xsink);
732  while (true) {
733  if (timeout_ms >= 0 && !isDataAvailable(timeout_ms, "accept", xsink)) {
734  if (*xsink)
735  return -1;
736  // do not throw exception here, NOTHING will be returned in Qore on timeout
737  return QSE_TIMEOUT; // -3
738  }
739 
740  int rc = ::accept(sock, addr, size);
741  if (rc != QORE_INVALID_SOCKET)
742  return rc;
743 
744  // retry if interrupted by a signal
745  if (sock_get_error() == EINTR)
746  continue;
747 
748  qore_socket_error(xsink, "SOCKET-ACCEPT-ERROR", "error in accept()", 0, 0, 0, addr);
749  return -1;
750  }
751  }
752 
753  // returns a new socket
754  DLLLOCAL int accept_internal(ExceptionSink* xsink, SocketSource *source, int timeout_ms = -1) {
755  assert(xsink);
756  if (sock == QORE_INVALID_SOCKET) {
757  xsink->raiseException("SOCKET-NOT-OPEN", "socket must be opened, bound, and in a listening state before "
758  "new connections can be accepted");
759  return QSE_NOT_OPEN;
760  }
761  if (in_op >= 0) {
762  if (in_op == q_gettid()) {
763  se_in_op("Socket", "accept", xsink);
764  return QSE_IN_OP;
765  }
766  se_in_op_thread("Socket", "accept", xsink);
767  return QSE_IN_OP_THREAD;
768  }
769 
770  int rc;
771  if (sfamily == AF_UNIX) {
772 #ifdef _Q_WINDOWS
773  xsink->raiseException("SOCKET-ACCEPT-ERROR", "UNIX sockets are not available under Windows");
774  return -1;
775 #else
776  hashdecl sockaddr_un addr_un;
777 
778 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
779  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
780  // but the library expects a 32-bit value
781  int size = sizeof(hashdecl sockaddr_un);
782 #else
783  socklen_t size = sizeof(hashdecl sockaddr_un);
784 #endif
785  rc = accept_intern(xsink, (struct sockaddr *)&addr_un, (socklen_t *)&size, timeout_ms);
786  //printd(1, "qore_socket_private::accept_internal() " QSD " bytes returned\n", size);
787 
788  if (rc >= 0 && source) {
789  QoreStringNode* addr = new QoreStringNode(enc);
790  addr->sprintf("UNIX socket: %s", socketname.c_str());
791  source->priv->setAddress(addr);
792  source->priv->setHostName("localhost");
793  }
794 #endif // windows
795  } else if (sfamily == AF_INET || sfamily == AF_INET6) {
796  hashdecl sockaddr_storage addr_in;
797 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
798  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes
799  // but the library expects a 32-bit value
800  int size = sizeof(addr_in);
801 #else
802  socklen_t size = sizeof(addr_in);
803 #endif
804 
805  rc = accept_intern(xsink, (struct sockaddr *)&addr_in, (socklen_t *)&size, timeout_ms);
806  //printd(1, "qore_socket_private::accept_internal() rc: %d, %d bytes returned\n", rc, size);
807 
808  if (rc >= 0 && source) {
809  char host[NI_MAXHOST + 1];
810  char service[NI_MAXSERV + 1];
811 
812  if (!getnameinfo((struct sockaddr *)&addr_in, qore_get_in_len((struct sockaddr *)&addr_in), host, sizeof(host), service, sizeof(service), NI_NUMERICSERV)) {
813  source->priv->setHostName(host);
814  }
815 
816  // get ipv4 or ipv6 address
817  char ifname[INET6_ADDRSTRLEN];
818  if (inet_ntop(addr_in.ss_family, qore_get_in_addr((struct sockaddr *)&addr_in), ifname, sizeof(ifname))) {
819  //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
820  source->priv->setAddress(ifname);
821  }
822  }
823  } else {
824  // should not happen
825  xsink->raiseException("SOCKET-ACCEPT-ERROR", "do not know how to accept connections with address family %d", sfamily);
826  rc = -1;
827  }
828  return rc;
829  }
830 
831  DLLLOCAL QoreHashNode* getEvent(int event, int source = QORE_SOURCE_SOCKET) const {
832  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
833  if (event_arg) {
834  h->setKeyValue("arg", event_arg.refSelf(), nullptr);
835  }
836 
837  h->setKeyValue("event", event, nullptr);
838  h->setKeyValue("source", source, nullptr);
839  h->setKeyValue("id", (int64)this, nullptr);
840 
841  return h;
842  }
843 
844  DLLLOCAL void cleanup(ExceptionSink* xsink) {
845  if (event_queue) {
846  // close the socket before the delete message is put on the queue
847  // the socket would be closed anyway in the destructor
848  close_internal();
849 
850  event_queue->pushAndTakeRef(getEvent(QORE_EVENT_DELETED));
851 
852  // deref and remove event queue
853  event_queue->deref(xsink);
854  event_queue = nullptr;
855  }
856  if (warn_queue) {
857  warn_queue->deref(xsink);
858  warn_queue = nullptr;
859  if (warn_callback_arg) {
860  warn_callback_arg.discard(xsink);
861  warn_callback_arg.clear();
862  }
863  }
864  }
865 
866  DLLLOCAL void setEventQueue(ExceptionSink* xsink, Queue* q, QoreValue arg, bool with_data) {
867  if (event_queue) {
868  if (event_arg) {
869  event_arg.discard(xsink);
870  }
871  event_queue->deref(xsink);
872  }
873  event_queue = q;
874  event_arg = arg;
875  event_data = with_data;
876  }
877 
878  DLLLOCAL void do_start_ssl_event() {
879  if (event_queue) {
880  event_queue->pushAndTakeRef(getEvent(QORE_EVENT_START_SSL));
881  }
882  }
883 
884  DLLLOCAL void do_ssl_established_event() {
885  if (event_queue) {
886  QoreHashNode* h = getEvent(QORE_EVENT_SSL_ESTABLISHED);
887  h->setKeyValue("cipher", new QoreStringNode(ssl->getCipherName()), nullptr);
888  h->setKeyValue("cipher_version", new QoreStringNode(ssl->getCipherVersion()), nullptr);
889  event_queue->pushAndTakeRef(h);
890  }
891  }
892 
893  DLLLOCAL void do_connect_event(int af, const struct sockaddr* addr, const char* target, const char* service = nullptr, int prt = -1) {
894  if (event_queue) {
895  QoreHashNode* h = getEvent(QORE_EVENT_CONNECTING);
896  QoreStringNode* str = q_addr_to_string2(addr);
897  if (str) {
898  h->setKeyValue("address", str, nullptr);
899  } else {
900  h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
901  }
902  q_af_to_hash(af, *h, nullptr);
903  h->setKeyValue("target", new QoreStringNode(target), nullptr);
904  if (service)
905  h->setKeyValue("service", new QoreStringNode(service), nullptr);
906  if (prt != -1)
907  h->setKeyValue("port", prt, nullptr);
908  event_queue->pushAndTakeRef(h);
909  }
910  }
911 
912  DLLLOCAL void do_connected_event() {
913  if (event_queue) {
914  event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CONNECTED));
915  }
916  }
917 
918  DLLLOCAL void do_data_event_intern(int event, int source, const QoreStringNode& str) const {
919  assert(event_queue && event_data && str.size());
920  ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
921  h->setKeyValue("data", str.refSelf(), nullptr);
922  event_queue->pushAndTakeRef(h.release());
923  }
924 
925  DLLLOCAL void do_data_event(int event, int source, const QoreStringNode& str) const {
926  if (event_queue && event_data && str.size()) {
927  do_data_event_intern(event, source, str);
928  }
929  }
930 
931  DLLLOCAL void do_data_event(int event, int source, const BinaryNode& b) const {
932  if (event_queue && event_data && b.size()) {
933  ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
934  h->setKeyValue("data", b.refSelf(), nullptr);
935  event_queue->pushAndTakeRef(h.release());
936  }
937  }
938 
939  DLLLOCAL void do_data_event(int event, int source, const void* data, size_t size) const {
940  if (event_queue && event_data && size) {
941  ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
943  b->append(data, size);
944  h->setKeyValue("data", b.release(), nullptr);
945  event_queue->pushAndTakeRef(h.release());
946  }
947  }
948 
949  DLLLOCAL void do_header_event(int event, int source, const QoreHashNode& hdr) const {
950  if (event_queue && event_data && !hdr.empty()) {
951  ReferenceHolder<QoreHashNode> h(getEvent(event, source), nullptr);
952  h->setKeyValue("headers", hdr.refSelf(), nullptr);
953  event_queue->pushAndTakeRef(h.release());
954  }
955  }
956 
957  DLLLOCAL void do_chunked_read(int event, size_t bytes, size_t total_read, int source) {
958  if (event_queue) {
959  QoreHashNode* h = getEvent(event, source);
960  if (event == QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED)
961  h->setKeyValue("read", bytes, nullptr);
962  else
963  h->setKeyValue("size", bytes, nullptr);
964  h->setKeyValue("total_read", total_read, nullptr);
965  event_queue->pushAndTakeRef(h);
966  }
967  }
968 
969  DLLLOCAL void do_read_http_header(int event, const QoreHashNode* headers, int source) {
970  if (event_queue) {
971  QoreHashNode* h = getEvent(event, source);
972  h->setKeyValue("headers", headers->hashRefSelf(), nullptr);
973  event_queue->pushAndTakeRef(h);
974  }
975  }
976 
977  DLLLOCAL void do_send_http_message_event(const QoreString& str, const QoreHashNode* headers, int source) {
978  if (event_queue) {
979  QoreHashNode* h = getEvent(QORE_EVENT_HTTP_SEND_MESSAGE, source);
980  h->setKeyValue("message", new QoreStringNode(str), nullptr);
981  //printd(5, "do_send_http_message_event() str='%s' headers: %p (%d %s)\n", str.getBuffer(), headers, headers->getType(), headers->getTypeName());
982  h->setKeyValue("headers", headers->copy(), nullptr);
983  event_queue->pushAndTakeRef(h);
984  }
985  }
986 
987  DLLLOCAL void do_close_event() {
988  if (event_queue) {
989  event_queue->pushAndTakeRef(getEvent(QORE_EVENT_CHANNEL_CLOSED));
990  }
991  }
992 
993  DLLLOCAL void do_read_event(size_t bytes_read, size_t total_read, size_t bufsize = 0, int source = QORE_SOURCE_SOCKET) {
994  // post bytes read on event queue, if any
995  if (event_queue) {
996  QoreHashNode* h = getEvent(QORE_EVENT_PACKET_READ, source);
997  h->setKeyValue("read", bytes_read, nullptr);
998  h->setKeyValue("total_read", total_read, nullptr);
999  // set total bytes to read and remaining bytes if bufsize > 0
1000  if (bufsize > 0)
1001  h->setKeyValue("total_to_read", bufsize, nullptr);
1002  event_queue->pushAndTakeRef(h);
1003  }
1004  }
1005 
1006  DLLLOCAL void do_send_event(int bytes_sent, int total_sent, int bufsize) {
1007  // post bytes sent on event queue, if any
1008  if (event_queue) {
1009  QoreHashNode* h = getEvent(QORE_EVENT_PACKET_SENT);
1010  h->setKeyValue("sent", bytes_sent, nullptr);
1011  h->setKeyValue("total_sent", total_sent, nullptr);
1012  h->setKeyValue("total_to_send", bufsize, nullptr);
1013  event_queue->pushAndTakeRef(h);
1014  }
1015  }
1016 
1017  DLLLOCAL void do_resolve_event(const char* host, const char* service = 0) {
1018  // post bytes sent on event queue, if any
1019  if (event_queue) {
1020  QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_LOOKUP);
1021  if (host)
1022  h->setKeyValue("name", new QoreStringNode(host), nullptr);
1023  if (service)
1024  h->setKeyValue("service", new QoreStringNode(service), nullptr);
1025  event_queue->pushAndTakeRef(h);
1026  }
1027  }
1028 
1029  DLLLOCAL void do_resolved_event(const struct sockaddr* addr) {
1030  // post bytes sent on event queue, if any
1031  if (event_queue) {
1032  QoreHashNode* h = getEvent(QORE_EVENT_HOSTNAME_RESOLVED);
1033  QoreStringNode* str = q_addr_to_string2(addr);
1034  if (str)
1035  h->setKeyValue("address", str, nullptr);
1036  else
1037  h->setKeyValue("error", q_strerror(sock_get_error()), nullptr);
1038  int prt = q_get_port_from_addr(addr);
1039  if (prt > 0)
1040  h->setKeyValue("port", prt, nullptr);
1041  q_af_to_hash(addr->sa_family, *h, nullptr);
1042  event_queue->pushAndTakeRef(h);
1043  }
1044  }
1045 
1046  DLLLOCAL int64 getObjectIDForEvents() const {
1047  return (int64)this;
1048  }
1049 
1050  DLLLOCAL int connectUNIX(const char* p, int sock_type, int protocol, ExceptionSink* xsink) {
1051  assert(xsink);
1052  assert(p);
1053  QORE_TRACE("connectUNIX()");
1054 
1055 #ifdef _Q_WINDOWS
1056  xsink->raiseException("SOCKET-CONNECTUNIX-ERROR", "UNIX sockets are not available under Windows");
1057  return -1;
1058 #else
1059  // close socket if already open
1060  close();
1061 
1062  printd(5, "qore_socket_private::connectUNIX(%s)\n", p);
1063 
1064  hashdecl sockaddr_un addr;
1065 
1066  addr.sun_family = AF_UNIX;
1067  // copy path and terminate if necessary
1068  strncpy(addr.sun_path, p, sizeof(addr.sun_path) - 1);
1069  addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1070  if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_SOCKET_ERROR) {
1071  xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "error connecting to UNIX socket: '%s'", p);
1072  return -1;
1073  }
1074 
1075  do_connect_event(AF_UNIX, (sockaddr*)&addr, p);
1076  while (true) {
1077  if (!::connect(sock, (const sockaddr *)&addr, sizeof(struct sockaddr_un)))
1078  break;
1079 
1080  // try again if we were interrupted by a signal
1081  if (sock_get_error() == EINTR)
1082  continue;
1083 
1084  // otherwise close the socket and return an exception with the error code
1085  // do not have to worry about windows API calls here; this is a UNIX-only function
1086  close_and_reset();
1087  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, p);
1088 
1089  return -1;
1090  }
1091 
1092  // save file name for deleting when socket is closed
1093  socketname = addr.sun_path;
1094  sfamily = AF_UNIX;
1095 
1096  do_connected_event();
1097 
1098  return 0;
1099 #endif // windows
1100  }
1101 
1102  // socket must be open or -1 is returned and a Qore-language exception is raised
1103  /* return values:
1104  -1: error
1105  0: timeout
1106  > 0: I/O can continue
1107  */
1108  DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, const char* cname, const char* mname,
1109  ExceptionSink* xsink) const {
1110  assert(xsink);
1111  assert(read || write);
1112  if (sock == QORE_INVALID_SOCKET) {
1113  se_not_open(cname, mname, xsink, "asyncIoWait");
1114  return -1;
1115  }
1116 
1117  return asyncIoWait(timeout_ms, read, write, xsink);
1118  }
1119 
1120  DLLLOCAL int asyncIoWait(int timeout_ms, bool read, bool write, ExceptionSink* xsink) const {
1121  assert(xsink);
1122 #if defined HAVE_POLL
1123  return poll_intern(xsink, timeout_ms, read, write);
1124 #elif defined HAVE_SELECT
1125  return select_intern(xsink, timeout_ms, read, write);
1126 #else
1127 #error no async socket operations supported
1128 #endif
1129  }
1130 
1131 #if defined HAVE_POLL
1132  DLLLOCAL int poll_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1133  int rc;
1134  short arg = 0;
1135  if (read)
1136  arg |= POLLIN;
1137  if (write)
1138  arg |= POLLOUT;
1139  pollfd fds = {sock, arg, 0};
1140  while (true) {
1141  rc = ::poll(&fds, 1, timeout_ms);
1142  if (rc == -1 && errno == EINTR)
1143  continue;
1144  break;
1145  }
1146  if (rc < 0)
1147  qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "poll(2) returned an error");
1148  else if (!rc && ((fds.revents & POLLHUP) || (fds.revents & (POLLERR|POLLNVAL))))
1149  rc = -1;
1150 
1151  return rc;
1152  }
1153 #elif defined HAVE_SELECT
1154  DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write) const {
1155  bool aborted = false;
1156  int rc = select_intern(xsink, timeout_ms, read, write, aborted);
1157  if (rc != QORE_SOCKET_ERROR && aborted)
1158  rc = -1;
1159  return rc;
1160  }
1161 
1162  DLLLOCAL int select_intern(ExceptionSink* xsink, int timeout_ms, bool read, bool write, bool& aborted) const {
1163  assert(xsink);
1164  assert(!aborted);
1165  // windows does not use FD_SETSIZE to limit the value of the highest socket descriptor in the set
1166  // instead it has a maximum of 64 sockets in the set; we only need one anyway
1167 #ifndef _Q_WINDOWS
1168  // select is inherently broken since it can only handle descriptors < FD_SETSIZE, which is 1024 on Linux for example
1169  if (sock >= FD_SETSIZE) {
1170  xsink->raiseException("SOCKET-SELECT-ERROR", "fd is %d which is >= %d; contact the Qore developers to implement an alternative to select() on this platform", sock, FD_SETSIZE);
1171  return -1;
1172  }
1173 #endif
1174  hashdecl timeval tv;
1175  int rc;
1176  while (true) {
1177  // to be safe, we set the file descriptor arg after each EINTR (required on Linux for example)
1178  fd_set sfs, err;
1179 
1180  FD_ZERO(&sfs);
1181  FD_ZERO(&err);
1182  FD_SET(sock, &sfs);
1183  FD_SET(sock, &err);
1184 
1185  tv.tv_sec = timeout_ms / 1000;
1186  tv.tv_usec = (timeout_ms % 1000) * 1000;
1187 
1188  fd_set* readfd = read ? &sfs : 0;
1189  fd_set* writefd = write ? &sfs : 0;
1190 
1191  rc = select(sock + 1, readfd, writefd, &err, &tv);
1192  //printd(5, "select_intern() rc: %d err: %d\n", rc, FD_ISSET(sock, &err));
1193  if (rc != QORE_SOCKET_ERROR) {
1194  if (FD_ISSET(sock, &err))
1195  aborted = true;
1196  break;
1197  }
1198  if (sock_get_error() != EINTR)
1199  break;
1200  }
1201  if (rc == QORE_SOCKET_ERROR) {
1202  // do not close the socket here, even in case of EBADF, just return an error
1203  rc = 0;
1204  qore_socket_error(xsink, "SOCKET-SELECT-ERROR", "select(2) returned an error");
1205  }
1206 
1207  return rc;
1208  }
1209 #endif
1210 
1211  DLLLOCAL bool tryReadSocketData(const char* mname, ExceptionSink* xsink) {
1212  assert(xsink);
1213  assert(!buflen);
1214  if (!ssl) {
1215  // issue #3564: see if any data is available on the socket
1216  return asyncIoWait(0, true, false, "Socket", mname, xsink);
1217  }
1218  // select can return true if there is protocol negotiation data available,
1219  // so we try to peek 1 byte of application data with a timeout of 0 with the SSL connection
1220  int rc = ssl->doSSLRW(xsink, mname, rbuf, 1, 0, PEEK, false);
1221  if (*xsink || (rc == QSE_TIMEOUT)) {
1222  return false;
1223  }
1224  return rc > 0 ? true : false;
1225  }
1226 
1227  DLLLOCAL bool isSocketDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1228  return asyncIoWait(timeout_ms, true, false, "Socket", mname, xsink);
1229  }
1230 
1231  DLLLOCAL bool isDataAvailable(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1232  if (buflen)
1233  return true;
1234  return isSocketDataAvailable(timeout_ms, mname, xsink);
1235  }
1236 
1237  DLLLOCAL bool isWriteFinished(int timeout_ms, const char* mname, ExceptionSink* xsink) {
1238  return asyncIoWait(timeout_ms, false, true, "Socket", mname, xsink);
1239  }
1240 
1241  DLLLOCAL int close_and_exit() {
1242  if (sock != QORE_INVALID_SOCKET)
1243  close_and_reset();
1244  return -1;
1245  }
1246 
1247  DLLLOCAL int connectINETTimeout(int timeout_ms, const struct sockaddr* ai_addr, size_t ai_addrlen,
1248  ExceptionSink* xsink, bool only_timeout) {
1249  assert(xsink);
1250  PrivateQoreSocketTimeoutHelper toh(this, "connect");
1251 
1252  while (true) {
1253  if (!::connect(sock, ai_addr, ai_addrlen))
1254  return 0;
1255 
1256 #ifdef _Q_WINDOWS
1257  if (sock_get_error() != EAGAIN) {
1258  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1259  break;
1260  }
1261 #else
1262  // try again if we were interrupted by a signal
1263  if (errno == EINTR)
1264  continue;
1265 
1266  if (errno != EINPROGRESS)
1267  break;
1268 #endif
1269 
1270  //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d errno: %d\n", timeout_ms, errno);
1271 
1272  // check for timeout or connection with EINPROGRESS
1273  while (true) {
1274 #ifdef _Q_WINDOWS
1275  bool aborted = false;
1276  int rc = select_intern(xsink, timeout_ms, false, true, aborted);
1277 
1278  //printd(5, "qore_socket_private::connectINETTimeout() timeout_ms: %d rc: %d aborted: %d\n",
1279  // timeout_ms, rc, aborted);
1280 
1281  // windows select() returns an error in the error socket set instead of an WSAECONNREFUSED error like
1282  // UNIX, so we simulate it here
1283  if (rc != QORE_SOCKET_ERROR && aborted) {
1284  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, 0, 0, ai_addr);
1285  return -1;
1286  }
1287 #else
1288  int rc = asyncIoWait(timeout_ms, false, true, "Socket", "connectINETTimeout", xsink);
1289 #endif
1290  if (*xsink)
1291  return -1;
1292 
1293  //printd(5, "asyncIoWait(%d) returned %d\n", timeout_ms, rc);
1294  if (rc == QORE_SOCKET_ERROR && sock_get_error() != EINTR) {
1295  if (!only_timeout)
1296  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in asyncIoWait() with "
1297  "Socket::connect() with timeout", 0, 0, 0, ai_addr);
1298  return -1;
1299  } else if (rc > 0) {
1300  return checkConnected(xsink, nullptr, ai_addr, only_timeout);
1301  } else {
1302  SimpleRefHolder<QoreStringNode> desc(new QoreStringNodeMaker("timeout in connection after %dms",
1303  timeout_ms));
1304  concat_target(*(*desc), ai_addr);
1305  xsink->raiseException("SOCKET-CONNECT-ERROR", desc.release());
1306  return -1;
1307  }
1308  }
1309  }
1310 
1311  return -1;
1312  }
1313 
1314  DLLLOCAL int checkConnected(ExceptionSink* xsink, const char* hostsvc, const struct sockaddr* ai_addr = nullptr,
1315  bool only_timeout = false) {
1316  assert(sock);
1317 
1318  // socket selected for write
1319  socklen_t lon = sizeof(int);
1320  int val;
1321 
1322  if (getsockopt(sock, SOL_SOCKET, SO_ERROR, (GETSOCKOPT_ARG_4)(&val), &lon) == QORE_SOCKET_ERROR) {
1323  if (!only_timeout) {
1324  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc, nullptr,
1325  ai_addr);
1326  }
1327  return -1;
1328  }
1329 
1330  if (val) {
1331  if (only_timeout) {
1332  errno = val;
1333  return -1;
1334  }
1335  qore_socket_error_intern(val, xsink, "SOCKET-CONNECT-ERROR", "error in getsockopt()", nullptr, hostsvc,
1336  nullptr, ai_addr);
1337  return -1;
1338  }
1339 
1340  // connected successfully within the timeout period
1341  return 0;
1342  }
1343 
1344  DLLLOCAL void confirmConnected(const char* host) {
1345  do_connected_event();
1346 
1347  // issue #3053: save hostname for SNI
1348  if (host) {
1349  client_target = host;
1350  }
1351  }
1352 
1353  DLLLOCAL int sock_errno_err(const char* err, const char* desc, ExceptionSink* xsink) {
1354  sock = QORE_INVALID_SOCKET;
1355  qore_socket_error(xsink, err, desc);
1356  return -1;
1357  }
1358 
1359  DLLLOCAL int set_non_blocking(bool non_blocking, ExceptionSink* xsink) {
1360  assert(xsink);
1361  // ignore call when socket already closed
1362  if (sock == QORE_INVALID_SOCKET) {
1363  assert(!xsink || *xsink);
1364  return -1;
1365  }
1366 
1367 #ifdef _Q_WINDOWS
1368  u_long mode = non_blocking ? 1 : 0;
1369  int rc = ioctlsocket(sock, FIONBIO, &mode);
1370  if (check_windows_rc(rc)) {
1371  return sock_errno_err("SOCKET-CONNECT-ERROR", "error in ioctlsocket(FIONBIO)", xsink);
1372  }
1373 #else
1374  int arg;
1375 
1376  // get socket descriptor status flags
1377  if ((arg = fcntl(sock, F_GETFL, 0)) < 0) {
1378  return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() getting socket descriptor status "
1379  "flag", xsink);
1380  }
1381 
1382  if (non_blocking) { // set non-blocking
1383  arg |= O_NONBLOCK;
1384  } else { // set blocking
1385  arg &= ~O_NONBLOCK;
1386  }
1387 
1388  if (fcntl(sock, F_SETFL, arg) < 0) {
1389  return sock_errno_err("SOCKET-CONNECT-ERROR", "error in fcntl() setting socket descriptor status "
1390  "flag", xsink);
1391  }
1392 #endif
1393  //printd(5, "qore_socket_private::set_non_blocking() set: %d\n", non_blocking);
1394 
1395  return 0;
1396  }
1397 
1398  DLLLOCAL int connectINET(const char* host, const char* service, int timeout_ms, ExceptionSink* xsink,
1399  int family = AF_UNSPEC, int type = SOCK_STREAM, int protocol = 0) {
1400  assert(xsink);
1401  family = q_get_af(family);
1402  type = q_get_sock_type(type);
1403 
1404  QORE_TRACE("qore_socket_private::connectINET()");
1405 
1406  // close socket if already open
1407  close();
1408 
1409  printd(5, "qore_socket_private::connectINET(%s:%s, %dms)\n", host, service, timeout_ms);
1410 
1411  do_resolve_event(host, service);
1412 
1413  QoreAddrInfo ai;
1414  if (ai.getInfo(xsink, host, service, family, 0, type, protocol))
1415  return -1;
1416 
1417  hashdecl addrinfo* aip = ai.getAddrInfo();
1418 
1419  // emit all "resolved" events
1420  if (event_queue)
1421  for (struct addrinfo* p = aip; p; p = p->ai_next)
1422  do_resolved_event(p->ai_addr);
1423 
1424  int prt = q_get_port_from_addr(aip->ai_addr);
1425 
1426  for (struct addrinfo* p = aip; p; p = p->ai_next) {
1427  if (!connectINETIntern(host, service, p->ai_family, p->ai_addr, p->ai_addrlen, p->ai_socktype,
1428  p->ai_protocol, prt, timeout_ms, xsink, true)) {
1429  return 0;
1430  }
1431  if (*xsink) {
1432  break;
1433  }
1434  }
1435 
1436  if (!*xsink) {
1437  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1438  }
1439  return -1;
1440  }
1441 
1442  DLLLOCAL int connectINETIntern(const char* host, const char* service, int ai_family, struct sockaddr* ai_addr,
1443  size_t ai_addrlen, int ai_socktype, int ai_protocol, int prt, int timeout_ms, ExceptionSink* xsink,
1444  bool only_timeout = false) {
1445  assert(xsink);
1446  printd(5, "qore_socket_private::connectINETIntern() host: %s service: %s family: %d timeout_ms: %d\n", host,
1447  service, ai_family, timeout_ms);
1448  if ((sock = socket(ai_family, ai_socktype, ai_protocol)) == QORE_INVALID_SOCKET) {
1449  xsink->raiseErrnoException("SOCKET-CONNECT-ERROR", errno, "cannot establish a connection to %s:%s", host,
1450  service);
1451  return -1;
1452  }
1453 
1454  //printd(5, "qore_socket_private::connectINETIntern(this: %p, host: '%s', port: %d, timeout_ms: %d) "
1455  // "sock: %d\n", this, host, port, timeout_ms, sock);
1456 
1457  int rc;
1458 
1459  // perform connect with timeout if a non-negative timeout was passed
1460  if (timeout_ms >= 0) {
1461  // set non-blocking
1462  if (set_non_blocking(true, xsink))
1463  return close_and_exit();
1464 
1465  do_connect_event(ai_family, ai_addr, host, service, prt);
1466 
1467  rc = connectINETTimeout(timeout_ms, ai_addr, ai_addrlen, xsink, only_timeout);
1468  //printd(5, "qore_socket_private::connectINETIntern() errno: %d rc: %d, xsink: %d\n", errno, rc, xsink && *xsink);
1469 
1470  // set blocking
1471  if (set_non_blocking(false, xsink))
1472  return close_and_exit();
1473  } else {
1474  do_connect_event(ai_family, ai_addr, host, service, prt);
1475 
1476  while (true) {
1477  rc = ::connect(sock, ai_addr, ai_addrlen);
1478 
1479  // try again if rc == -1 and errno == EINTR
1480  if (!rc || sock_get_error() != EINTR)
1481  break;
1482  }
1483  }
1484 
1485  if (rc < 0) {
1486  if (!only_timeout || errno == ETIMEDOUT)
1487  qore_socket_error(xsink, "SOCKET-CONNECT-ERROR", "error in connect()", 0, host, service);
1488 
1489  return close_and_exit();
1490  }
1491 
1492  sfamily = ai_family;
1493  stype = ai_socktype;
1494  sprot = ai_protocol;
1495  port = prt;
1496  //printd(5, "qore_socket_private::connectINETIntern(this: %p, host='%s', port: %d, timeout_ms: %d) success, "
1497  // "rc: %d, sock: %d\n", this, host, port, timeout_ms, rc, sock);
1498 
1499  confirmConnected(host);
1500  return 0;
1501  }
1502 
1503  DLLLOCAL int upgradeClientToSSLIntern(const char* mname, const char* sni_target_host, X509* cert, EVP_PKEY* pkey,
1504  int timeout_ms, ExceptionSink* xsink) {
1505  assert(!ssl);
1506  SSLSocketHelperHelper sshh(this, true);
1507 
1508  int rc;
1509  do_start_ssl_event();
1510  // issue #3053: send target hostname to support SNI
1511  if (!sni_target_host && !client_target.empty()) {
1512  sni_target_host = client_target.c_str();
1513  }
1514  if ((rc = ssl->setClient(mname, sni_target_host, sock, cert, pkey, xsink)) || ssl->connect(mname, timeout_ms,
1515  xsink)) {
1516  sshh.error();
1517  return rc ? rc : -1;
1518  }
1519  do_ssl_established_event();
1520 
1521  return 0;
1522  }
1523 
1524  DLLLOCAL int upgradeServerToSSLIntern(const char* mname, X509* cert, EVP_PKEY* pkey, int timeout_ms,
1525  ExceptionSink* xsink) {
1526  assert(!ssl);
1527  //printd(5, "qore_socket_private::upgradeServerToSSLIntern() this: %p mode: %d\n", this, ssl_verify_mode);
1528  SSLSocketHelperHelper sshh(this, true);
1529 
1530  do_start_ssl_event();
1531  if (ssl->setServer(mname, sock, cert, pkey, xsink) || ssl->accept(mname, timeout_ms, xsink)) {
1532  sshh.error();
1533  return -1;
1534  }
1535  do_ssl_established_event();
1536 
1537  return 0;
1538  }
1539 
1540  // returns 0 = success, -1 = error
1541  DLLLOCAL int openUNIX(int sock_type = SOCK_STREAM, int protocol = 0) {
1542  if (sock != QORE_INVALID_SOCKET)
1543  close();
1544 
1545  if ((sock = socket(AF_UNIX, sock_type, protocol)) == QORE_INVALID_SOCKET) {
1546  return -1;
1547  }
1548 
1549  sfamily = AF_UNIX;
1550  stype = sock_type;
1551  sprot = protocol;
1552  port = -1;
1553  return 0;
1554  }
1555 
1556  // returns 0 = success, -1 = error
1557  DLLLOCAL int openINET(int family = AF_INET, int sock_type = SOCK_STREAM, int protocol = 0) {
1558  if (sock != QORE_INVALID_SOCKET)
1559  close();
1560 
1561  if ((sock = socket(family, sock_type, protocol)) == QORE_INVALID_SOCKET)
1562  return -1;
1563 
1564  sfamily = family;
1565  stype = sock_type;
1566  sprot = protocol;
1567  port = -1;
1568  return 0;
1569  }
1570 
1571  DLLLOCAL int reuse(int opt) {
1572  //printf("qore_socket_private::reuse(%s)\n", opt ? "true" : "false");
1573  return setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (SETSOCKOPT_ARG_4)&opt, sizeof(int));
1574  }
1575 
1576  // the only place where xsink is optional
1577  DLLLOCAL int bindIntern(struct sockaddr* ai_addr, size_t ai_addrlen, int prt, bool reuseaddr, ExceptionSink* xsink = 0) {
1578  reuse(reuseaddr);
1579 
1580  if ((::bind(sock, ai_addr, ai_addrlen)) == QORE_SOCKET_ERROR) {
1581  if (xsink)
1582  qore_socket_error(xsink, "SOCKET-BIND-ERROR", "error in bind()", 0, 0, 0, ai_addr);
1583  close();
1584  return -1;
1585  }
1586 
1587  // set port number
1588  if (prt)
1589  port = prt;
1590  else {
1591  // get port number
1592 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1593  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1594  int len = ai_addrlen;
1595 #else
1596  socklen_t len = ai_addrlen;
1597 #endif
1598 
1599  if (getsockname(sock, ai_addr, &len))
1600  port = -1;
1601  else
1602  port = q_get_port_from_addr(ai_addr);
1603  }
1604  return 0;
1605  }
1606 
1607  // bind to UNIX domain socket file
1608  DLLLOCAL int bindUNIX(ExceptionSink* xsink, const char* name, int socktype = SOCK_STREAM, int protocol = 0) {
1609  assert(xsink);
1610 #ifdef _Q_WINDOWS
1611  xsink->raiseException("SOCKET-BINDUNIX-ERROR", "UNIX sockets are not available under Windows");
1612  return -1;
1613 #else
1614  close();
1615 
1616  // try to open socket if necessary
1617  if (openUNIX(socktype, protocol)) {
1618  xsink->raiseErrnoException("SOCKET-BIND-ERROR", errno, "error opening UNIX socket ('%s') for bind", name);
1619  return -1;
1620  }
1621 
1622  hashdecl sockaddr_un addr;
1623  addr.sun_family = AF_UNIX;
1624  // copy path and terminate if necessary
1625  strncpy(addr.sun_path, name, sizeof(addr.sun_path) - 1);
1626  addr.sun_path[sizeof(addr.sun_path) - 1] = '\0';
1627 
1628  if (bindIntern((sockaddr*)&addr, sizeof(struct sockaddr_un), -1, false, xsink))
1629  return -1;
1630 
1631  // save socket file name for deleting on close
1632  socketname = addr.sun_path;
1633  // delete UNIX domain socket on close
1634  del = true;
1635  return 0;
1636 #endif // windows
1637  }
1638 
1639  DLLLOCAL int bindINET(ExceptionSink* xsink, const char* name, const char* service, bool reuseaddr = true, int family = AF_UNSPEC, int socktype = SOCK_STREAM, int protocol = 0) {
1640  assert(xsink);
1641  family = q_get_af(family);
1642  socktype = q_get_sock_type(socktype);
1643 
1644  close();
1645 
1646  QoreAddrInfo ai;
1647  do_resolve_event(name, service);
1648  if (ai.getInfo(xsink, name, service, family, AI_PASSIVE, socktype, protocol))
1649  return -1;
1650 
1651  hashdecl addrinfo* aip = ai.getAddrInfo();
1652  // first emit all "resolved" events
1653  if (event_queue)
1654  for (struct addrinfo* p = aip; p; p = p->ai_next)
1655  do_resolved_event(p->ai_addr);
1656 
1657  // try to open socket if necessary
1658  if (openINET(aip->ai_family, aip->ai_socktype, protocol)) {
1659  qore_socket_error(xsink, "SOCKET-BINDINET-ERROR", "error opening socket for bind", 0, name, service);
1660  return -1;
1661  }
1662 
1663  int prt = q_get_port_from_addr(aip->ai_addr);
1664 
1665  int en = 0;
1666  // iterate through addresses and bind to the first interface possible
1667  for (struct addrinfo* p = aip; p; p = p->ai_next) {
1668  if (!bindIntern(p->ai_addr, p->ai_addrlen, prt, reuseaddr)) {
1669  //printd(5, "qore_socket_private::bindINET(family: %d) bound: name: %s service: %s f: %d st: %d p: %d\n", family, name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol);
1670  return 0;
1671  }
1672 
1673  en = sock_get_raw_error();
1674  //printd(5, "qore_socket_private::bindINET() failed to bind: name: %s service: %s f: %d st: %d p: %d, errno: %d (%s)\n", name ? name : "(null)", service ? service : "(null)", p->ai_family, p->ai_socktype, p->ai_protocol, en, strerror(en));
1675  }
1676 
1677  // if no bind was possible, then raise an exception
1678  qore_socket_error_intern(en, xsink, "SOCKET-BIND-ERROR", "error binding on socket", 0, name, service);
1679  return -1;
1680  }
1681 
1682  // only called from qore-bound code - always with xsink
1683  DLLLOCAL QoreHashNode* getPeerInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1684  assert(xsink);
1685  if (sock == QORE_INVALID_SOCKET) {
1686  se_not_open("Socket", "getPeerInfo", xsink);
1687  return 0;
1688  }
1689 
1690  hashdecl sockaddr_storage addr;
1691  socklen_t len = sizeof addr;
1692  if (getpeername(sock, (struct sockaddr*)&addr, &len)) {
1693  qore_socket_error(xsink, "SOCKET-GETPEERINFO-ERROR", "error in getpeername()");
1694  return 0;
1695  }
1696 
1697  return getAddrInfo(addr, len, host_lookup);
1698  }
1699 
1700  // only called from qore-bound code - always with xsink
1701  DLLLOCAL QoreHashNode* getSocketInfo(ExceptionSink* xsink, bool host_lookup = true) const {
1702  assert(xsink);
1703  if (sock == QORE_INVALID_SOCKET) {
1704  se_not_open("Socket", "getSocketInfo", xsink);
1705  return 0;
1706  }
1707 
1708  hashdecl sockaddr_storage addr;
1709 #if defined(HPUX) && defined(__ia64) && defined(__LP64__)
1710  // on HPUX 64-bit the OS defines socklen_t to be 8 bytes, but the library expects a 32-bit value
1711  int len = sizeof addr;
1712 #else
1713  socklen_t len = sizeof addr;
1714 #endif
1715 
1716  if (getsockname(sock, (struct sockaddr*)&addr, &len)) {
1717  qore_socket_error(xsink, "SOCKET-GETSOCKETINFO-ERROR", "error in getsockname()");
1718  return 0;
1719  }
1720 
1721  return getAddrInfo(addr, len, host_lookup);
1722  }
1723 
1724  DLLLOCAL QoreHashNode* getAddrInfo(const struct sockaddr_storage& addr, socklen_t len, bool host_lookup = true) const {
1725  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
1726 
1727  if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1728  if (host_lookup) {
1729  char host[NI_MAXHOST + 1];
1730 
1731  if (!getnameinfo((struct sockaddr*)&addr, qore_get_in_len((struct sockaddr*)&addr), host, sizeof(host), 0, 0, 0)) {
1732  QoreStringNode* hoststr = new QoreStringNode(host);
1733  h->setKeyValue("hostname", hoststr, 0);
1734  h->setKeyValue("hostname_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, hoststr->getBuffer()), 0);
1735  }
1736  }
1737 
1738  // get ipv4 or ipv6 address
1739  char ifname[INET6_ADDRSTRLEN];
1740  if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr*)&addr), ifname, sizeof(ifname))) {
1741  QoreStringNode* addrstr = new QoreStringNode(ifname);
1742  h->setKeyValue("address", addrstr, 0);
1743  h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->getBuffer()), 0);
1744  }
1745 
1746  int tport;
1747  if (addr.ss_family == AF_INET) {
1748  hashdecl sockaddr_in* s = (hashdecl sockaddr_in*)&addr;
1749  tport = ntohs(s->sin_port);
1750  } else {
1751  hashdecl sockaddr_in6* s = (hashdecl sockaddr_in6*)&addr;
1752  tport = ntohs(s->sin6_port);
1753  }
1754 
1755  h->setKeyValue("port", tport, 0);
1756  }
1757 #ifndef _Q_WINDOWS
1758  else if (addr.ss_family == AF_UNIX) {
1759  assert(!socketname.empty());
1760  QoreStringNode* addrstr = new QoreStringNode(socketname);
1761  h->setKeyValue("address", addrstr, 0);
1762  h->setKeyValue("address_desc", QoreAddrInfo::getAddressDesc(addr.ss_family, addrstr->getBuffer()), 0);
1763  }
1764 #endif
1765 
1766  h->setKeyValue("family", addr.ss_family, 0);
1767  h->setKeyValue("familystr", new QoreStringNode(QoreAddrInfo::getFamilyName(addr.ss_family)), 0);
1768 
1769  return h;
1770  }
1771 
1772  // set backwards-compatible object members on accept
1773  // to be (hopefully) deleted in a future version of qore
1774  DLLLOCAL void setAccept(QoreObject* o) {
1775  hashdecl sockaddr_storage addr;
1776 
1777  socklen_t len = sizeof addr;
1778  if (getpeername(sock, (struct sockaddr*)&addr, &len))
1779  return;
1780 
1781  if (addr.ss_family == AF_INET || addr.ss_family == AF_INET6) {
1782  // get ipv4 or ipv6 address
1783  char ifname[INET6_ADDRSTRLEN];
1784  if (inet_ntop(addr.ss_family, qore_get_in_addr((struct sockaddr *)&addr), ifname, sizeof(ifname))) {
1785  //printd(5, "inet_ntop() '%s' host: '%s'\n", ifname, host);
1786  o->setValue("source", new QoreStringNode(ifname), 0);
1787  }
1788 
1789  char host[NI_MAXHOST + 1];
1790  if (!getnameinfo((struct sockaddr *)&addr, qore_get_in_len((struct sockaddr *)&addr), host, sizeof(host), 0, 0, 0))
1791  o->setValue("source_host", new QoreStringNode(host), 0);
1792  }
1793 #ifndef _Q_WINDOWS
1794  else if (addr.ss_family == AF_UNIX) {
1795  QoreStringNode* astr = new QoreStringNode(enc);
1796  hashdecl sockaddr_un *addr_un = (hashdecl sockaddr_un *)&addr;
1797  astr->sprintf("UNIX socket: %s", addr_un->sun_path);
1798  o->setValue("source", astr, 0);
1799  o->setValue("source_host", new QoreStringNode("localhost"), 0);
1800  }
1801 #endif
1802  }
1803 
1805 
1810  DLLLOCAL int readByteFromBuffer(char& output) {
1811  // must be checked if open/connected before this function is called
1812  assert(sock != QORE_INVALID_SOCKET);
1813 
1814  // always returned buffered data first
1815  if (!buflen) {
1816  return -1;
1817  }
1818 
1819  output = *(rbuf + bufoffset);
1820  if (buflen == 1) {
1821  buflen = 0;
1822  bufoffset = 0;
1823  } else {
1824  --buflen;
1825  ++bufoffset;
1826  }
1827  return 0;
1828  }
1829 
1830  // buffered reads for high performance
1831  DLLLOCAL qore_offset_t brecv(ExceptionSink* xsink, const char* meth, char*& buf, size_t bs, int flags,
1832  int timeout, bool do_event = true) {
1833  assert(xsink);
1834  // must be checked if open/connected before this function is called
1835  assert(sock != QORE_INVALID_SOCKET);
1836  assert(meth);
1837 
1838  // always returned buffered data first
1839  if (buflen) {
1840  buf = rbuf + bufoffset;
1841  if (buflen <= bs) {
1842  bs = buflen;
1843  buflen = 0;
1844  bufoffset = 0;
1845  } else {
1846  buflen -= bs;
1847  bufoffset += bs;
1848  }
1849  return (qore_offset_t)bs;
1850  }
1851 
1852  // real socket reads are only done when the buffer is empty
1853 
1854  //printd(5, "qore_socket_private::brecv(buf: %p, bs: %d, flags: %d, timeout: %d, do_event: %d) this: %p ssl: %d\n", buf, (int)bs, flags, timeout, (int)do_event, this, ssl);
1855 
1856  qore_offset_t rc;
1857  if (!ssl) {
1858  if (timeout != -1 && !isDataAvailable(timeout, meth, xsink)) {
1859  if (*xsink)
1860  return -1;
1861  se_timeout("Socket", meth, timeout, xsink);
1862 
1863  return QSE_TIMEOUT;
1864  }
1865 
1866  while (true) {
1867 #ifdef DEBUG
1868  errno = 0;
1869 #endif
1870  rc = ::recv(sock, rbuf, DEFAULT_SOCKET_BUFSIZE, flags);
1871  if (rc == QORE_SOCKET_ERROR) {
1872  sock_get_error();
1873  if (errno == EINTR)
1874  continue;
1875 #ifdef ECONNRESET
1876  if (errno == ECONNRESET) {
1877  se_closed("Socket", meth, xsink);
1878  close();
1879  } else
1880 #endif
1881  qore_socket_error(xsink, "SOCKET-RECV-ERROR", "error in recv()", meth);
1882  break;
1883  }
1884  //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags, rc, errno);
1885  // try again if we were interrupted by a signal
1886  if (rc >= 0)
1887  break;
1888  }
1889  } else
1890  rc = ssl->read(meth, rbuf, DEFAULT_SOCKET_BUFSIZE, timeout, xsink);
1891 
1892  //printd(5, "qore_socket_private::brecv(%d, %p, %ld, %d) rc: %ld errno: %d\n", sock, buf, bs, flags, rc, errno);
1893  if (rc > 0) {
1894  buf = rbuf;
1895  assert(!buflen);
1896  assert(!bufoffset);
1897  if (rc > (qore_offset_t)bs) {
1898  buflen = rc - bs;
1899  bufoffset = bs;
1900  rc = bs;
1901  }
1902 
1903  // register event
1904  if (do_event)
1905  do_read_event(rc, rc);
1906  } else {
1907 #ifdef DEBUG
1908  buf = 0;
1909 #endif
1910  if (!rc)
1911  close();
1912  }
1913 
1914  return rc;
1915  }
1916 
1918  DLLLOCAL QoreStringNode* readHTTPData(ExceptionSink* xsink, const char* meth, int timeout, qore_offset_t& rc,
1919  bool exit_early = false) {
1920  assert(xsink);
1921  assert(meth);
1922  if (sock == QORE_INVALID_SOCKET) {
1923  se_not_open("Socket", meth, xsink, "readHTTPData");
1924  rc = QSE_NOT_OPEN;
1925  return 0;
1926  }
1927 
1928  PrivateQoreSocketThroughputHelper th(this, false);
1929 
1930  // state:
1931  // 0 = '\r' received
1932  // 1 = '\r\n' received
1933  // 2 = '\r\n\r' received
1934  // 3 = '\n' received
1935  // read in HHTP header until \r\n\r\n or \n\n from socket
1936  int state = -1;
1937  QoreStringNodeHolder hdr(new QoreStringNode(enc));
1938 
1939  size_t count = 0;
1940 
1941  while (true) {
1942  char* buf;
1943  rc = brecv(xsink, meth, buf, 1, 0, timeout, false);
1944  //printd(5, "qore_socket_private::readHTTPData() this: %p Socket::%s(): rc: " QLLD " read char: %c (%03d) (old state: %d)\n", this, meth, rc, rc > 0 && buf[0] > 31 ? buf[0] : '?', rc > 0 ? buf[0] : 0, state);
1945  if (rc <= 0) {
1946  //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (len: %d), rc=" QSD ", errno: %d: '%s'\n", timeout, hdr->getBuffer(), hdr->strlen(), rc, errno, strerror(errno));
1947 
1948  if (!*xsink) {
1949  if (!count) {
1950  //printd(5, "qore_socket_private::readHTTPData() this: %p rc: %d count: %d (%d) timeout: %d\n", this, rc, count, hdr->size(), timeout);
1951  se_closed("Socket", meth, xsink);
1952  } else {
1953  xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "socket closed on remote end while reading header data after reading " QSD " byte%s", count, count == 1 ? "" : "s");
1954  }
1955  }
1956  return 0;
1957  }
1958  char c = buf[0];
1959  if (++count == QORE_MAX_HEADER_SIZE) {
1960  xsink->raiseException("SOCKET-HTTP-ERROR", "header size cannot exceed " QSD " bytes", count);
1961  return 0;
1962  }
1963 
1964  // check if we can progress to the next state
1965  if (c == '\n') {
1966  if (state == -1) {
1967  state = 3;
1968  continue;
1969  }
1970  if (!state) {
1971  if (exit_early && hdr->empty())
1972  return 0;
1973  state = 1;
1974  continue;
1975  }
1976  assert(state > 0);
1977  break;
1978  } else if (c == '\r') {
1979  if (state == -1) {
1980  state = 0;
1981  continue;
1982  }
1983  if (!state)
1984  break;
1985  if (state == 1) {
1986  state = 2;
1987  continue;
1988  }
1989  }
1990 
1991  if (state != -1) {
1992  switch (state) {
1993  case 0: hdr->concat('\r'); break;
1994  case 1: hdr->concat("\r\n"); break;
1995  case 2: hdr->concat("\r\n\r"); break;
1996  case 3: hdr->concat('\n'); break;
1997  }
1998  state = -1;
1999  }
2000  hdr->concat(c);
2001  }
2002  hdr->concat('\n');
2003 
2004  //printd(5, "qore_socket_private::readHTTPData(timeout: %d) hdr='%s' (%d)\n", timeout, hdr->getBuffer(), hdr->size());
2005 
2006  th.finalize(hdr->size());
2007 
2008  return hdr.release();
2009  }
2010 
2011  DLLLOCAL QoreStringNode* recv(ExceptionSink* xsink, qore_offset_t bufsize, int timeout, qore_offset_t& rc, int source = QORE_SOURCE_SOCKET) {
2012  assert(xsink);
2013  if (sock == QORE_INVALID_SOCKET) {
2014  se_not_open("Socket", "recv", xsink, "recv");
2015  rc = QSE_NOT_OPEN;
2016  return 0;
2017  }
2018  if (in_op >= 0) {
2019  if (in_op == q_gettid()) {
2020  se_in_op("Socket", "recv", xsink);
2021  return 0;
2022  }
2023  se_in_op_thread("Socket", "recv", xsink);
2024  return 0;
2025  }
2026 
2027  PrivateQoreSocketThroughputHelper th(this, false);
2028 
2029  size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2030 
2031  QoreStringNodeHolder str(new QoreStringNode(enc));
2032 
2033  char* buf;
2034 
2035  while (true) {
2036  rc = brecv(xsink, "recv", buf, bs, 0, timeout, false);
2037 
2038  if (rc <= 0) {
2039  printd(5, "qore_socket_private::recv(" QSD ", %d) bs=" QSD ", br=" QSD ", rc=" QSD ", errno: %d (%s)\n", bufsize, timeout, bs, str->size(), rc, errno, strerror(errno));
2040  break;
2041  }
2042 
2043  str->concat(buf, rc);
2044 
2045  // register event
2046  if (source > 0) {
2047  do_read_event(rc, str->size(), bufsize, source);
2048  }
2049 
2050  if (bufsize > 0) {
2051  if (str->size() >= (size_t)bufsize)
2052  break;
2053  if ((bufsize - str->size()) < bs)
2054  bs = bufsize - str->size();
2055  }
2056  }
2057 
2058  printd(5, "qore_socket_private::recv() received " QSD " byte(s), bufsize=" QSD ", strlen=" QSD " str='%s'\n", str->size(), bufsize, (str ? str->strlen() : 0), str ? str->getBuffer() : "n/a");
2059 
2060  // "fix" return code value if no error occurred
2061  if (rc >= 0)
2062  rc = str->size();
2063 
2064  th.finalize(str->size());
2065 
2066  if (*xsink) {
2067  return nullptr;
2068  }
2069 
2070  if (source > 0) {
2071  do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2072  }
2073  return str.release();
2074  }
2075 
2076  DLLLOCAL QoreStringNode* recvAll(ExceptionSink* xsink, int timeout, qore_offset_t& rc, int source = QORE_SOURCE_SOCKET) {
2077  assert(xsink);
2078  if (sock == QORE_INVALID_SOCKET) {
2079  se_not_open("Socket", "recv", xsink, "recvAll");
2080  rc = QSE_NOT_OPEN;
2081  return 0;
2082  }
2083  if (in_op >= 0) {
2084  if (in_op == q_gettid()) {
2085  se_in_op("Socket", "recv", xsink);
2086  return 0;
2087  }
2088  se_in_op_thread("Socket", "recv", xsink);
2089  return 0;
2090  }
2091 
2092  PrivateQoreSocketThroughputHelper th(this, false);
2093 
2094  QoreStringNodeHolder str(new QoreStringNode(enc));
2095 
2096  // perform first read with timeout
2097  char* buf;
2098  rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2099  if (rc <= 0)
2100  return 0;
2101 
2102  str->concat(buf, rc);
2103 
2104  // register event
2105  do_read_event(rc, rc);
2106 
2107  // keep reading data until no more data is available without a timeout
2108  if (isDataAvailable(0, "recv", xsink)) {
2109  do {
2110  rc = brecv(xsink, "recv", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false);
2111  //printd(5, "qore_socket_private::recv(to: %d) rc=" QSD " rd=" QSD "\n", timeout, rc, str->size());
2112  // if the remote end has closed the connection, return what we have
2113  if (!rc)
2114  break;
2115  if (rc < 0) {
2116  th.finalize(str->size());
2117  return 0;
2118  }
2119  str->concat(buf, rc);
2120 
2121  // register event
2122  do_read_event(rc, str->size());
2123  } while (isDataAvailable(0, "recv", xsink));
2124  }
2125 
2126  th.finalize(str->size());
2127 
2128  if (*xsink) {
2129  return nullptr;
2130  }
2131 
2132  rc = str->size();
2133  if (source > 0) {
2134  do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **str);
2135  }
2136  return str.release();
2137  }
2138 
2139  DLLLOCAL int recv(int fd, qore_offset_t size, int timeout_ms, ExceptionSink* xsink);
2140 
2141  DLLLOCAL BinaryNode* recvBinary(ExceptionSink* xsink, qore_offset_t bufsize, int timeout, qore_offset_t& rc, int source = QORE_SOURCE_SOCKET) {
2142  assert(xsink);
2143  if (sock == QORE_INVALID_SOCKET) {
2144  se_not_open("Socket", "recvBinary", xsink, "recvBinary");
2145  rc = QSE_NOT_OPEN;
2146  return 0;
2147  }
2148  if (in_op >= 0) {
2149  if (in_op == q_gettid()) {
2150  se_in_op("Socket", "recvBinary", xsink);
2151  return 0;
2152  }
2153  se_in_op_thread("Socket", "recvBinary", xsink);
2154  return 0;
2155  }
2156 
2157  PrivateQoreSocketThroughputHelper th(this, false);
2158 
2159  size_t bs = bufsize > 0 && bufsize < DEFAULT_SOCKET_BUFSIZE ? bufsize : DEFAULT_SOCKET_BUFSIZE;
2160 
2162 
2163  char* buf;
2164  while (true) {
2165  rc = brecv(xsink, "recvBinary", buf, bs, 0, timeout);
2166  if (rc <= 0)
2167  break;
2168 
2169  b->append(buf, rc);
2170 
2171  if (bufsize > 0) {
2172  if (b->size() >= (size_t)bufsize)
2173  break;
2174  if ((bufsize - b->size()) < bs)
2175  bs = bufsize - b->size();
2176  }
2177  }
2178 
2179  th.finalize(b->size());
2180 
2181  if (*xsink)
2182  return nullptr;
2183 
2184  // "fix" return code value if no error occurred
2185  if (rc >= 0)
2186  rc = b->size();
2187 
2188  if (source > 0) {
2189  do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2190  }
2191  printd(5, "qore_socket_private::recvBinary() received " QSD " byte(s), bufsize=" QSD ", blen=" QSD "\n", b->size(), bufsize, b->size());
2192  return b.release();
2193  }
2194 
2195  DLLLOCAL BinaryNode* recvBinaryAll(ExceptionSink* xsink, int timeout, qore_offset_t& rc, int source = QORE_SOURCE_SOCKET) {
2196  assert(xsink);
2197  if (sock == QORE_INVALID_SOCKET) {
2198  se_not_open("Socket", "recvBinary", xsink, "recvBinaryAll");
2199  rc = QSE_NOT_OPEN;
2200  return 0;
2201  }
2202  if (in_op >= 0) {
2203  if (in_op == q_gettid()) {
2204  se_in_op("Socket", "recvBinary", xsink);
2205  return 0;
2206  }
2207  se_in_op_thread("Socket", "recvBinary", xsink);
2208  return 0;
2209  }
2210 
2211  PrivateQoreSocketThroughputHelper th(this, false);
2212 
2214 
2215  //printd(5, "QoreSocket::recvBinary(%d, " QSD ") this: %p\n", timeout, rc, this);
2216  // perform first read with timeout
2217  char* buf;
2218  rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, timeout, false);
2219  if (rc <= 0)
2220  return 0;
2221 
2222  b->append(buf, rc);
2223 
2224  // register event
2225  do_read_event(rc, rc);
2226 
2227  // keep reading data until no more data is available without a timeout
2228  if (isDataAvailable(0, "recvBinary", xsink)) {
2229  do {
2230  rc = brecv(xsink, "recvBinary", buf, DEFAULT_SOCKET_BUFSIZE, 0, 0, false);
2231  // if the remote end has closed the connection, return what we have
2232  if (!rc)
2233  break;
2234  if (rc < 0) {
2235  th.finalize(b->size());
2236  return 0;
2237  }
2238 
2239  b->append(buf, rc);
2240 
2241  // register event
2242  do_read_event(rc, b->size());
2243  } while (isDataAvailable(0, "recvBinary", xsink));
2244  }
2245 
2246  th.finalize(b->size());
2247 
2248  if (*xsink)
2249  return nullptr;
2250 
2251  if (source > 0) {
2252  do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, **b);
2253  }
2254  rc = b->size();
2255  //printd(5, "qore_socket_private() this: %p b: %p size: %lld\n", this, b->getPtr(), rc);
2256  return b.release();
2257  }
2258 
2259  DLLLOCAL void recvToOutputStream(OutputStream *os, int64 size, int64 timeout, ExceptionSink *xsink, QoreThreadLock* l, int source = QORE_SOURCE_SOCKET) {
2260  if (sock == QORE_INVALID_SOCKET) {
2261  se_not_open("Socket", "recvToOutputStream", xsink);
2262  return;
2263  }
2264  if (in_op >= 0) {
2265  if (in_op == q_gettid()) {
2266  se_in_op("Socket", "recvToOutputStream", xsink);
2267  return;
2268  }
2269  se_in_op_thread("Socket", "recvToOutputStream", xsink);
2270  return;
2271  }
2272 
2273  qore_socket_op_helper oh(this);
2274 
2275  char* buf;
2276  qore_offset_t br = 0;
2277  while (size < 0 || br < size) {
2278  // calculate bytes needed
2279  int bn = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - br, DEFAULT_SOCKET_BUFSIZE);
2280 
2281  qore_offset_t rc = brecv(xsink, "recvToOutputStream", buf, bn, 0, timeout);
2282  if (rc < 0) {
2283  //error - already reported in xsink
2284  return;
2285  }
2286  if (rc == 0) {
2287  //eof
2288  if (size >= 0) {
2289  //not all size bytes were read
2290  xsink->raiseException("SOCKET-RECV-ERROR", "Unexpected end of stream");
2291  }
2292  return;
2293  }
2294 
2295  if (source > 0) {
2296  do_data_event(QORE_EVENT_SOCKET_DATA_READ, source, buf, rc);
2297  }
2298 
2299  // write buffer to the stream
2300  {
2301  AutoUnlocker al(l);
2302  os->write(buf, rc, xsink);
2303  if (*xsink) {
2304  return;
2305  }
2306  }
2307 
2308  br += rc;
2309  }
2310  }
2311 
2312  DLLLOCAL QoreStringNode* readHTTPHeaderString(ExceptionSink* xsink, int timeout, int source) {
2313  assert(xsink);
2314  qore_offset_t rc;
2315  QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeaderString", timeout, rc));
2316  if (!hdr) {
2317  assert(*xsink);
2318  return 0;
2319  }
2320  assert(rc > 0);
2321  do_data_event(QORE_EVENT_HTTP_HEADERS_READ, source, **hdr);
2322  return hdr.release();
2323  }
2324 
2325  DLLLOCAL QoreHashNode* readHTTPHeader(ExceptionSink* xsink, QoreHashNode* info, int timeout,
2326  qore_offset_t& rc, int source, const char* headers_raw_key = "headers-raw") {
2327  assert(xsink);
2328  QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPHeader", timeout, rc));
2329  if (!hdr) {
2330  assert(*xsink);
2331  return nullptr;
2332  }
2333 
2334  if (hdr->empty()) {
2335  xsink->raiseException("SOCKET-HTTP-ERROR", "remote closed the connection while reading the HTTP header");
2336  return nullptr;
2337  }
2338  assert(rc > 0);
2339 
2340  return processHttpHeaderString(xsink, hdr, info, source, headers_raw_key);
2341  }
2342 
2344  DLLLOCAL QoreHashNode* processHttpHeaderString(ExceptionSink* xsink, QoreStringNodeHolder& hdr,
2345  QoreHashNode* info, int source, const char* headers_raw_key = "headers-raw") {
2346  const char* buf = hdr->c_str();
2347  char* p;
2348  if ((p = (char*)strstr(buf, "\r\n"))) {
2349  *p = '\0';
2350  p += 2;
2351  } else if ((p = (char*)strchr(buf, '\n'))) {
2352  *p = '\0';
2353  ++p;
2354  } else if ((p = (char*)strchr(buf, '\r'))) {
2355  *p = '\0';
2356  ++p;
2357  } else {
2358  // readHTTPData will only return a string that satisifies one of the above conditions,
2359  // however an embedded 0 could have been sent which would make the above searches invalid
2360  xsink->raiseException("SOCKET-HTTP-ERROR", "invalid header received with embedded nulls in "
2361  "Socket::readHTTPHeader()");
2362  return nullptr;
2363  }
2364 
2365  char* t1;
2366  if (!(t1 = (char*)strstr(buf, "HTTP/"))) {
2367  xsink->raiseExceptionArg("SOCKET-HTTP-ERROR", hdr.release(), "missing HTTP version string in "
2368  "first header line in Socket::readHTTPHeader()");
2369  return nullptr;
2370  }
2371 
2372  ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
2373 
2374  // process header flags
2375  int flags = CHF_PROCESS;
2376 
2377  // get version
2378  {
2379  QoreStringNode* hv = new QoreStringNode(t1 + 5, 3, enc);
2380  h->setKeyValue("http_version", hv, nullptr);
2381  if (*hv == "1.1") {
2382  flags |= CHF_HTTP11;
2383  }
2384  }
2385 
2386  // if we are getting a response
2387  // key for info if applicable
2388  const char* info_key;
2389  if (t1 == buf) {
2390  char* t2 = (char*)strchr(buf + 8, ' ');
2391  if (t2) {
2392  t2++;
2393  if (isdigit(*(t2))) {
2394  h->setKeyValue("status_code", atoi(t2), nullptr);
2395  if (strlen(t2) > 4) {
2396  h->setKeyValue("status_message", new QoreStringNode(t2 + 4), nullptr);
2397  }
2398  }
2399  }
2400  // write the status line as the "response-uri" key in the info hash if present
2401  // NOTE: this is not a URI, so the name is not really appropriate
2402  info_key = "response-uri";
2403  } else { // get method and path
2404  char* t2 = (char*)strchr(buf, ' ');
2405  if (t2) {
2406  *t2 = '\0';
2407  h->setKeyValue("method", new QoreStringNode(buf), nullptr);
2408  t2++;
2409  t1 = strchr(t2, ' ');
2410  if (t1) {
2411  *t1 = '\0';
2412  //printd(5, "found path '%s'\n", t2);
2413  // the path is returned as-is with no decodings - use decode_url() to decode
2414  h->setKeyValue("path", new QoreStringNode(t2, enc), nullptr);
2415  }
2416  }
2417  info_key = "request-uri";
2418  flags |= CHF_REQUEST;
2419  }
2420 
2421  // write status line or request line to the info hash and raise a data event if applicable
2422  if (info || (event_queue && event_data)) {
2423  QoreStringNodeHolder status_line(new QoreStringNode(buf));
2424  if (info && event_queue && event_data) {
2425  status_line->ref();
2426  }
2427  if (event_queue && event_data) {
2428  do_data_event_intern(QORE_EVENT_SOCKET_DATA_READ, source, **status_line);
2429  }
2430  if (info) {
2431  info->setKeyValue(info_key, *status_line, nullptr);
2432  }
2433  status_line.release();
2434  }
2435 
2436  bool close = convertHeaderToHash(*h, p, flags, info, &http_exp_chunked_body, headers_raw_key);
2437  do_read_http_header(QORE_EVENT_HTTP_MESSAGE_RECEIVED, *h, source);
2438 
2439  // process header info
2440  if ((flags & CHF_REQUEST) && info) {
2441  info->setKeyValue("close", close, 0);
2442  }
2443 
2444  return h.release();
2445  }
2446 
2447  // info must be already referenced for the assignment, if present
2448  DLLLOCAL int runHeaderCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2449  const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreHashNode* hdr, QoreHashNode* info,
2450  bool send_aborted = false, QoreObject* obj = nullptr) {
2451  assert(xsink);
2452  assert(obj);
2453  ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2454  QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2455  arg->setKeyValue("hdr", hdr ? hdr->refSelf() : nullptr, xsink);
2456  arg->setKeyValue("info", info, xsink);
2457  if (obj)
2458  arg->setKeyValue("obj", obj->refSelf(), xsink);
2459  arg->setKeyValue("send_aborted", send_aborted, xsink);
2460  args->push(arg, nullptr);
2461 
2462  ValueHolder rv(xsink);
2463  return runCallback(xsink, cname, mname, rv, callback, l, *args);
2464  }
2465 
2466  DLLLOCAL int runTrailerCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2468  ValueHolder rv(xsink);
2469  if (runCallback(xsink, cname, mname, rv, callback, l, nullptr))
2470  return -1;
2471 
2472  switch (rv->getType()) {
2473  case NT_NOTHING:
2474  break;
2475  case NT_HASH: {
2476  hdr = rv.release().get<QoreHashNode>();
2477  break;
2478  }
2479  default:
2480  xsink->raiseException("HTTP-TRAILER-ERROR", "chunked callback returned type '%s'; expecting 'hash' "
2481  "or 'NOTHING'", rv->getTypeName());
2482  return -1;
2483  }
2484  return 0;
2485  }
2486 
2487  DLLLOCAL int runDataCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2488  const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const AbstractQoreNode* data, bool chunked) {
2489  assert(xsink);
2490  ReferenceHolder<QoreListNode> args(new QoreListNode(autoTypeInfo), xsink);
2491  QoreHashNode* arg = new QoreHashNode(autoTypeInfo);
2492  arg->setKeyValue("data", data->realCopy(), xsink);
2493  arg->setKeyValue("chunked", chunked, xsink);
2494  args->push(arg, nullptr);
2495 
2496  ValueHolder rv(xsink);
2497  return runCallback(xsink, cname, mname, rv, callback, l, *args);
2498  }
2499 
2500  DLLLOCAL int runCallback(ExceptionSink* xsink, const char* cname, const char* mname, ValueHolder& res,
2501  const ResolvedCallReferenceNode& callback, QoreThreadLock* l, const QoreListNode* args = nullptr) {
2502  assert(xsink);
2503  // FIXME: subtract callback execution time from socket performance measurement
2504 
2505  // unlock and execute callback
2506  {
2507  AutoUnlocker al(l);
2508  res = callback.execValue(args, xsink);
2509  }
2510 
2511  // check exception and socket status
2512  assert(xsink);
2513  if (*xsink)
2514  return -1;
2515 
2516  if (sock == QORE_INVALID_SOCKET) {
2517  se_not_open(cname, mname, xsink, "runCallback");
2518  return QSE_NOT_OPEN;
2519  }
2520 
2521  return 0;
2522  }
2523 
2524  DLLLOCAL int sendHttpChunkedWithCallback(ExceptionSink* xsink, const char* cname, const char* mname,
2525  const ResolvedCallReferenceNode& send_callback, QoreThreadLock& l, int source, int timeout_ms = -1,
2526  bool* aborted = nullptr) {
2527  assert(xsink);
2528  assert(!aborted || !(*aborted));
2529 
2530  if (sock == QORE_INVALID_SOCKET) {
2531  se_not_open(cname, mname, xsink, "sendHttpChunkedWithCallback");
2532  return QSE_NOT_OPEN;
2533  }
2534  if (in_op >= 0) {
2535  if (in_op == q_gettid()) {
2536  se_in_op(cname, mname, xsink);
2537  return 0;
2538  }
2539  se_in_op_thread(cname, mname, xsink);
2540  return 0;
2541  }
2542 
2543  PrivateQoreSocketThroughputHelper th(this, true);
2544 
2545  // set the non-blocking flag (for use with non-ssl connections)
2546  bool nb = (timeout_ms >= 0);
2547  // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2548  OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2549  if (*xsink)
2550  return -1;
2551 
2552  qore_socket_op_helper oh(this);
2553 
2554  qore_offset_t rc;
2555  int64 total = 0;
2556  bool done = false;
2557 
2558  while (!done) {
2559  // if we have response data already, then we assume an error and abort
2560  if (aborted) {
2561  bool data_available = tryReadSocketData(mname, xsink);
2562  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2563  if (data_available || *xsink) {
2564  *aborted = true;
2565  return *xsink ? -1 : 0;
2566  }
2567  }
2568 
2569  // FIXME: subtract callback execution time from socket performance measurement
2570  ValueHolder res(xsink);
2571  rc = runCallback(xsink, cname, mname, res, send_callback, &l);
2572  if (rc)
2573  return rc;
2574 
2575  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p res: %s\n", this, get_type_name(*res));
2576 
2577  // check callback return val
2578  QoreString buf;
2579  // do not copy data here; set references and send the data directly
2580  const char* data_ptr = nullptr;
2581  size_t data_size = 0;
2582 
2583  switch (res->getType()) {
2584  case NT_STRING: {
2585  const QoreStringNode* str = res->get<const QoreStringNode>();
2586  if (str->empty()) {
2587  done = true;
2588  break;
2589  }
2590  buf.sprintf("%x\r\n", (int)str->size());
2591  data_ptr = str->c_str();
2592  data_size = str->size();
2593  //buf.concat(str->c_str(), str->size());
2594  break;
2595  }
2596 
2597  case NT_BINARY: {
2598  const BinaryNode* b = res->get<const BinaryNode>();
2599  if (b->empty()) {
2600  done = true;
2601  break;
2602  }
2603  buf.sprintf("%x\r\n", (int)b->size());
2604  data_ptr = static_cast<const char*>(b->getPtr());
2605  data_size = b->size();
2606  //buf.concat((const char*)b->getPtr(), b->size());
2607  break;
2608  }
2609 
2610  case NT_HASH: {
2611  buf.concat("0\r\n");
2612 
2613  const QoreHashNode* h = res->get<const QoreHashNode>();
2614  ConstHashIterator hi(h);
2615  while (hi.next()) {
2616  const QoreValue v = hi.get();
2617  const char* key = hi.getKey();
2618 
2619  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p trailer %s\n", this, key);
2620 
2621  if (v.getType() == NT_LIST) {
2622  ConstListIterator li(v.get<const QoreListNode>());
2623  while (li.next())
2624  do_header(key, buf, li.getValue());
2625  } else
2626  do_header(key, buf, v);
2627  }
2628  // fall through to next case
2629  }
2630 
2631  case NT_NOTHING:
2632  case NT_NULL:
2633  done = true;
2634  break;
2635 
2636  default:
2637  xsink->raiseException("SOCKET-CALLBACK-ERROR", "HTTP chunked data callback returned type '%s'; expecting one of: 'string', 'binary', 'hash', 'nothing' (or 'NULL')", res->getTypeName());
2638  return -1;
2639  }
2640 
2641  // send chunk buffer data
2642  if (!buf.empty()) {
2643  rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2644  }
2645 
2646  if (!*xsink) {
2647  assert(rc >= 0);
2648  // send actual data, if available
2649  if (data_ptr && data_size) {
2650  rc = sendIntern(xsink, cname, mname, data_ptr, data_size, timeout_ms, total, true);
2651  }
2652 
2653  if (!*xsink) {
2654  assert(rc >= 0);
2655  if (buf.empty() && (!data_ptr || !data_size)) {
2656  buf.set("0\r\n\r\n");
2657  } else {
2658  buf.set("\r\n");
2659  }
2660  rc = sendIntern(xsink, cname, mname, buf.c_str(), buf.size(), timeout_ms, total, true);
2661  }
2662  }
2663 
2664  if (!*xsink) {
2665  // do events
2666  switch (res->getType()) {
2667  case NT_STRING: {
2668  const QoreStringNode* str = res->get<const QoreStringNode>();
2669  if (!str->empty()) {
2670  do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *str);
2671  }
2672  break;
2673  }
2674 
2675  case NT_BINARY: {
2676  const BinaryNode* b = res->get<const BinaryNode>();
2677  if (!b->empty()) {
2678  do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, source, *b);
2679  }
2680  break;
2681  }
2682 
2683  case NT_HASH: {
2684  const QoreHashNode* h = res->get<const QoreHashNode>();
2685  do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, source, *h);
2686  break;
2687  }
2688  }
2689  }
2690 
2691  if (rc < 0) {
2692  // if we have a socket I/O error, but also data to be read on the socket, then clear the exception and return 0
2693  if (aborted && *xsink) {
2694  bool data_available = tryReadSocketData(mname, xsink);
2695  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p aborted: %p iDA: %d\n", this, aborted, data_available);
2696  if (data_available) {
2697  *aborted = true;
2698  return *xsink ? -1 : 0;
2699  }
2700  }
2701 
2702  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p rc: %d sock: %d xsink: %d\n", this, rc, sock, xsink->isException());
2703  }
2704 
2705  //printd(5, "qore_socket_private::sendHttpChunkedWithCallback() this: %p sent: %s\n", this, buf.getBuffer());
2706 
2707  if (rc < 0 || sock == QORE_INVALID_SOCKET)
2708  break;
2709  }
2710 
2711  th.finalize(total);
2712 
2713  return rc < 0 || sock == QORE_INVALID_SOCKET ? -1 : 0;
2714  }
2715 
2716  DLLLOCAL int sendIntern(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2717  int timeout_ms, int64& total, bool stream = false) {
2718  assert(xsink);
2719  qore_offset_t rc;
2720  size_t bs = 0;
2721 
2722  // set the non-blocking flag (for use with non-ssl connections)
2723  bool nb = (timeout_ms >= 0);
2724 
2725  while (true) {
2726  if (ssl) {
2727  // SSL_MODE_ENABLE_PARTIAL_WRITE is enabled so we can get finer-grained socket events for do_send_event() below
2728  rc = ssl->write(mname, buf + bs, size - bs, timeout_ms, xsink);
2729  } else {
2730  while (true) {
2731  rc = ::send(sock, buf + bs, size - bs, 0);
2732  //printd(5, "qore_socket_private::send() this: %p Socket::%s() buf: %p size: " QLLD " timeout_ms: %d ssl: %p nb: %d bs: " QLLD " rc: " QLLD "\n", this, mname, buf, size, timeout_ms, ssl, nb, bs, rc);
2733  // try again if we were interrupted by a signal
2734  if (rc >= 0)
2735  break;
2736  sock_get_error();
2737  // check that the send finishes before the timeout if we are using non-blocking I/O
2738  if (nb && (errno == EAGAIN
2739 #ifdef EWOULDBLOCK
2740  || errno == EWOULDBLOCK
2741 #endif
2742  )) {
2743  if (!isWriteFinished(timeout_ms, mname, xsink)) {
2744  if (*xsink)
2745  return -1;
2746  se_timeout("Socket", mname, timeout_ms, xsink);
2747  rc = QSE_TIMEOUT;
2748  break;
2749  }
2750  continue;
2751  }
2752  if (errno != EINTR) {
2753  //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d sock: %d\n", bs, rc, size - bs, size, errno, sock);
2754  xsink->raiseErrnoException("SOCKET-SEND-ERROR", errno, "error while executing %s::%s()", cname, mname);
2755 
2756  // do not close the socket even if we have EPIPE or ECONNRESET in case there is data to be read when streaming
2757 #ifdef EPIPE
2758  if (!stream && errno == EPIPE)
2759  close();
2760 #endif
2761 #ifdef ECONNRESET
2762  if (!stream && errno == ECONNRESET)
2763  close();
2764 #endif
2765  break;
2766  }
2767  }
2768  }
2769 
2770  total += rc;
2771 
2772  //printd(5, "qore_socket_private::send() bs: %ld rc: " QSD " len: " QSD " (total: " QSD ") errno: %d\n", bs, rc, size - bs, size, errno);
2773  if (rc < 0 || sock == QORE_INVALID_SOCKET)
2774  break;
2775 
2776  bs += rc;
2777 
2778  do_send_event(rc, bs, size);
2779 
2780  if (bs >= size)
2781  break;
2782  }
2783 
2784  return rc;
2785  }
2786 
2787  DLLLOCAL int send(int fd, qore_offset_t size, int timeout_ms, ExceptionSink* xsink);
2788 
2789  DLLLOCAL int send(ExceptionSink* xsink, const char* cname, const char* mname, const char* buf, size_t size,
2790  int timeout_ms = -1, int source = QORE_SOURCE_SOCKET) {
2791  assert(xsink);
2792  if (sock == QORE_INVALID_SOCKET) {
2793  se_not_open(cname, mname, xsink, "send");
2794  return QSE_NOT_OPEN;
2795  }
2796  if (in_op >= 0) {
2797  if (in_op == q_gettid()) {
2798  se_in_op(cname, mname, xsink);
2799  return 0;
2800  }
2801  se_in_op_thread(cname, mname, xsink);
2802  return 0;
2803  }
2804  if (!size) {
2805  return 0;
2806  }
2807 
2808  PrivateQoreSocketThroughputHelper th(this, true);
2809 
2810  // set the non-blocking flag (for use with non-ssl connections)
2811  bool nb = (timeout_ms >= 0);
2812  // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2813  OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2814  if (*xsink) {
2815  return -1;
2816  }
2817 
2818  int64 total = 0;
2819  qore_offset_t rc = sendIntern(xsink, cname, mname, buf, size, timeout_ms, total);
2820  th.finalize(total);
2821 
2822  if (rc > 0 && source > 0) {
2823  do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, buf, size);
2824  }
2825 
2826  return rc < 0 || sock == QORE_INVALID_SOCKET ? rc : 0;
2827  }
2828 
2829  DLLLOCAL void sendFromInputStream(InputStream *is, int64 size, int64 timeout, ExceptionSink *xsink,
2830  QoreThreadLock* l) {
2831  if (sock == QORE_INVALID_SOCKET) {
2832  se_not_open("Socket", "sendFromInputStream", xsink);
2833  return;
2834  }
2835  if (in_op >= 0) {
2836  if (in_op == q_gettid()) {
2837  se_in_op("Socket", "sendFromInputStream", xsink);
2838  return;
2839  }
2840  se_in_op_thread("Socket", "sendFromInputStream", xsink);
2841  return;
2842  }
2843 
2844  qore_socket_op_helper oh(this);
2845 
2846  PrivateQoreSocketThroughputHelper th(this, true);
2847 
2848  // set the non-blocking flag (for use with non-ssl connections)
2849  bool nb = (timeout >= 0);
2850  // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2851  OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2852  if (*xsink)
2853  return;
2854 
2855  char buf[DEFAULT_SOCKET_BUFSIZE];
2856  int64 sent = 0;
2857  int64 total = 0;
2858  while (size < 0 || sent < size) {
2859  int64 toRead = size < 0 ? DEFAULT_SOCKET_BUFSIZE : QORE_MIN(size - sent, DEFAULT_SOCKET_BUFSIZE);
2860  int64 r;
2861  {
2862  AutoUnlocker al(l);
2863  r = is->read(buf, toRead, xsink);
2864  if (*xsink) {
2865  return;
2866  }
2867  }
2868  if (r == 0) {
2869  //eof
2870  if (size >= 0) {
2871  //not all size bytes were sent
2872  xsink->raiseException("SOCKET-SEND-ERROR", "Unexpected end of stream");
2873  return;
2874  }
2875  break;
2876  }
2877 
2878  qore_offset_t rc = sendIntern(xsink, "Socket", "sendFromInputStream", buf, r, timeout, total);
2879  if (rc < 0) {
2880  return;
2881  }
2882  do_data_event(QORE_EVENT_SOCKET_DATA_SENT, QORE_SOURCE_SOCKET, buf, r);
2883  sent += r;
2884  }
2885  th.finalize(total);
2886  }
2887 
2888  DLLLOCAL void sendHttpChunkedBodyFromInputStream(InputStream* is, size_t max_chunk_size, int timeout, ExceptionSink* xsink, QoreThreadLock* l, const ResolvedCallReferenceNode* trailer_callback) {
2889  if (sock == QORE_INVALID_SOCKET) {
2890  se_not_open("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2891  return;
2892  }
2893  if (in_op >= 0) {
2894  if (in_op == q_gettid()) {
2895  se_in_op("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2896  return;
2897  }
2898  se_in_op_thread("Socket", "sendHttpChunkedBodyFromInputStream", xsink);
2899  return;
2900  }
2901 
2902  qore_socket_op_helper oh(this);
2903 
2904  PrivateQoreSocketThroughputHelper th(this, true);
2905 
2906  // set the non-blocking flag (for use with non-ssl connections)
2907  bool nb = (timeout >= 0);
2908  // set non-blocking I/O (and restore on exit) if we have a timeout and a non-ssl connection
2909  OptionalNonBlockingHelper onbh(*this, !ssl && nb, xsink);
2910  if (*xsink)
2911  return;
2912 
2914  // reserve enough space for the maximum size of the buffer + HTTP overhead
2915  buf->preallocate(max_chunk_size);
2916  int64 total = 0;
2917  while (true) {
2918  int64 r;
2919  {
2920  AutoUnlocker al(l);
2921  r = is->read((void*)buf->getPtr(), sizeof(max_chunk_size), xsink);
2922  if (*xsink)
2923  return;
2924  }
2925 
2926  // send HTTP chunk prelude with chunk size
2927  QoreString str;
2928  str.sprintf("%x\r\n", (int)r);
2929  int rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2930  if (rc < 0)
2931  return;
2932 
2933  bool trailers = false;
2934 
2935  // send chunk data, if any
2936  if (r) {
2937  rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", (const char*)buf->getPtr(), r, timeout, total, true);
2938  if (rc < 0)
2939  return;
2940  do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_SENT, QORE_SOURCE_SOCKET, buf->getPtr(), r);
2941  } else if (trailer_callback) {
2942  // get and send chunk trailers, if any
2944 
2945  if (runTrailerCallback(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", *trailer_callback, l, h))
2946  return;
2947  if (h) {
2948  str.clear();
2949  do_headers(str, *h, 0, false);
2950 
2951  rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2952  if (rc < 0)
2953  return;
2954 
2955  do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, **h);
2956  trailers = true;
2957  }
2958  }
2959 
2960  // close chunk if we sent no trailers
2961  if (!trailers) {
2962  str.set("\r\n");
2963  rc = sendIntern(xsink, "Socket", "sendHttpChunkedBodyFromInputStream", str.c_str(), str.size(), timeout, total, true);
2964  if (rc < 0)
2965  return;
2966  }
2967 
2968  if (!r) {
2969  // end of stream
2970  break;
2971  }
2972  }
2973  th.finalize(total);
2974  }
2975 
2976  DLLLOCAL void sendHttpChunkedBodyTrailer(const QoreHashNode* headers, int timeout, ExceptionSink* xsink) {
2977  if (sock == QORE_INVALID_SOCKET) {
2978  se_not_open("Socket", "sendHttpChunkedBodyTrailer", xsink);
2979  return;
2980  }
2981  if (in_op >= 0) {
2982  if (in_op == q_gettid()) {
2983  se_in_op("Socket", "sendHttpChunkedBodyTrailer", xsink);
2984  return;
2985  }
2986  se_in_op_thread("Socket", "sendHttpChunkedBodyTrailer", xsink);
2987  return;
2988  }
2989 
2990  QoreString buf;
2991  if (!headers) {
2992  ConstHashIterator hi(headers);
2993 
2994  while (hi.next()) {
2995  const QoreValue v = hi.get();
2996  const char* key = hi.getKey();
2997 
2998  if (v.getType() == NT_LIST) {
2999  ConstListIterator li(v.get<const QoreListNode>());
3000  while (li.next())
3001  do_header(key, buf, li.getValue());
3002  }
3003  else
3004  do_header(key, buf, v);
3005  }
3006  }
3007  buf.concat("\r\n");
3008  int64 total;
3009  sendIntern(xsink, "Socket", "sendHttpChunkedBodyTrailer", buf.getBuffer(), buf.size(), timeout, total, true);
3010  if (!*xsink) {
3011  do_header_event(QORE_EVENT_HTTP_FOOTERS_SENT, QORE_SOURCE_SOCKET, *headers);
3012  }
3013  }
3014 
3015  DLLLOCAL void getSendHttpMessageHeaders(QoreString& hdr, QoreHashNode* info, const char* method, const char* path,
3016  const char* http_version, const QoreHashNode* headers, size_t size, int source) {
3017  // prepare header string
3018  hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3019 
3020  // write request-uri key if info hash is non-null
3021  if (info) {
3022  info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3023  }
3024 
3025  getSendHttpMessageHeadersCommon(hdr, info, headers, size, source);
3026  }
3027 
3028  DLLLOCAL void getSendHttpMessageHeadersCommon(QoreString& hdr, QoreHashNode* info, const QoreHashNode* headers,
3029  size_t size, int source) {
3030  // send event
3031  do_send_http_message_event(hdr, headers, source);
3032 
3033  // add headers
3034  hdr.concat("\r\n");
3035  // insert headers
3036  do_headers(hdr, headers, size);
3037  }
3038 
3039  DLLLOCAL int sendHttpMessage(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3040  const char* method, const char* path, const char* http_version, const QoreHashNode* headers,
3041  const QoreStringNode* body, const void* data, size_t size,
3042  const ResolvedCallReferenceNode* send_callback, InputStream* input_stream, size_t max_chunk_size,
3043  const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3044  QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3045  // prepare header string
3046  QoreString hdr(enc);
3047 
3048  hdr.sprintf("%s %s HTTP/%s", method, path && path[0] ? path : "/", http_version);
3049 
3050  // write request-uri key if info hash is non-null
3051  if (info) {
3052  info->setKeyValue("request-uri", new QoreStringNode(hdr), nullptr);
3053  }
3054 
3055  return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3056  input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3057  }
3058 
3059  DLLLOCAL int sendHttpResponse(ExceptionSink* xsink, QoreHashNode* info, const char* cname, const char* mname,
3060  int code, const char* desc, const char* http_version, const QoreHashNode* headers, const QoreStringNode* body,
3061  const void* data, size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3062  size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3063  QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3064  // prepare header string
3065  QoreString hdr(enc);
3066 
3067  // write HTTP response status line
3068  hdr.sprintf("HTTP/%s %03d %s", http_version, code, desc);
3069 
3070  // write the status line as the "response-uri" key if info hash is non-null
3071  // NOTE: this is not a URI, so the name is not really appropriate
3072  if (info) {
3073  info->setKeyValue("response-uri", new QoreStringNode(hdr), nullptr);
3074  }
3075 
3076  return sendHttpMessageCommon(xsink, hdr, info, cname, mname, headers, body, data, size, send_callback,
3077  input_stream, max_chunk_size, trailer_callback, source, timeout_ms, l, aborted);
3078  }
3079 
3080  DLLLOCAL int sendHttpMessageCommon(ExceptionSink* xsink, QoreString& hdr, QoreHashNode* info, const char* cname,
3081  const char* mname, const QoreHashNode* headers, const QoreStringNode* body, const void* data,
3082  size_t size, const ResolvedCallReferenceNode* send_callback, InputStream* input_stream,
3083  size_t max_chunk_size, const ResolvedCallReferenceNode* trailer_callback, int source, int timeout_ms = -1,
3084  QoreThreadLock* l = nullptr, bool* aborted = nullptr) {
3085  assert(xsink);
3086  assert(!(data && send_callback));
3087  assert(!(data && input_stream));
3088  assert(!(send_callback && input_stream));
3089 
3090  // send event
3091  do_send_http_message_event(hdr, headers, source);
3092 
3093  // add headers
3094  hdr.concat("\r\n");
3095  // insert headers
3096  do_headers(hdr, headers, size && data ? size : 0);
3097 
3098  //printd(5, "qore_socket_private::sendHttpMessage() hdr: %s\n", hdr.c_str());
3099 
3100  // send URI and headers
3101  int rc;
3102  if ((rc = send(xsink, cname, mname, hdr.c_str(), hdr.size(), timeout_ms, -1)))
3103  return rc;
3104 
3105  // header message sent above with do_sent_http_message_event()
3106  if (size && data) {
3107  int rc = send(xsink, cname, mname, (char*)data, size, timeout_ms, -1);
3108  if (!rc) {
3109  if (body) {
3110  do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, *body);
3111  } else {
3112  do_data_event(QORE_EVENT_SOCKET_DATA_SENT, source, data, size);
3113  }
3114  }
3115  return rc;
3116  } else if (send_callback) {
3117  assert(l);
3118  assert(!aborted || !(*aborted));
3119  return sendHttpChunkedWithCallback(xsink, cname, mname, *send_callback, *l, source, timeout_ms, aborted);
3120  } else if (input_stream) {
3121  assert(l);
3122  sendHttpChunkedBodyFromInputStream(input_stream, max_chunk_size, timeout_ms, xsink, l, trailer_callback);
3123  return *xsink ? -1 : 0;
3124  }
3125 
3126  return 0;
3127  }
3128 
3129  DLLLOCAL QoreHashNode* readHttpChunkedBodyBinary(int timeout, ExceptionSink* xsink, const char* cname, int source, const ResolvedCallReferenceNode* recv_callback = nullptr, QoreThreadLock* l = nullptr, QoreObject* obj = nullptr, OutputStream* os = nullptr) {
3130  assert(xsink);
3131 
3132  if (sock == QORE_INVALID_SOCKET) {
3133  se_not_open(cname, "readHTTPChunkedBodyBinary", xsink);
3134  return 0;
3135  }
3136  if (in_op >= 0) {
3137  if (in_op == q_gettid()) {
3138  se_in_op(cname, "readHTTPChunkedBodyBinary", xsink);
3139  return 0;
3140  }
3141  se_in_op_thread(cname, "readHTTPChunkedBodyBinary", xsink);
3142  return 0;
3143  }
3144 
3145  // reset "expecting HTTP chunked body" flag
3146  if (http_exp_chunked_body)
3147  http_exp_chunked_body = false;
3148 
3149  qore_socket_op_helper oh(this);
3150 
3151  SimpleRefHolder<BinaryNode> b(os ? nullptr : new BinaryNode);
3152  QoreString str; // for reading the size of each chunk
3153 
3154  qore_offset_t rc;
3155  // read the size then read the data and append to buffer
3156  while (true) {
3157  // state = 0, nothing
3158  // state = 1, \r received
3159  int state = 0;
3160  while (true) {
3161  char* buf;
3162  rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 1, 0, timeout, false);
3163  if (rc <= 0) {
3164  if (!*xsink) {
3165  assert(!rc);
3166  se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3167  }
3168  return 0;
3169  }
3170 
3171  char c = buf[0];
3172 
3173  if (!state && c == '\r')
3174  state = 1;
3175  else if (state && c == '\n')
3176  break;
3177  else {
3178  if (state) {
3179  state = 0;
3180  str.concat('\r');
3181  }
3182  str.concat(c);
3183  }
3184  }
3185  // DEBUG
3186  //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.getBuffer());
3187 
3188  // terminate string at ';' char if present
3189  char* p = (char*)strchr(str.getBuffer(), ';');
3190  if (p)
3191  *p = '\0';
3192  long size = strtol(str.c_str(), 0, 16);
3193  do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.size(), source);
3194 
3195  if (!size)
3196  break;
3197 
3198  if (size < 0) {
3199  xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3200  return 0;
3201  }
3202 
3203  // prepare string for chunk
3204  //str.allocate(size + 1);
3205 
3206  qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3207  qore_offset_t br = 0; // bytes received
3208  while (true) {
3209  char* buf;
3210  rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, bs, 0, timeout, false);
3211  //printd(5, "qore_socket_private::readHTTPChunkedBodyBinary() str: '%s' bs: %lld rc: %lld b: %p (%lld) recv_callback: %p\n", str.c_str(), bs, rc, *b, b->size(), recv_callback);
3212  if (rc <= 0) {
3213  if (!*xsink) {
3214  assert(!rc);
3215  se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3216  }
3217  return nullptr;
3218  }
3219 
3220  do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, buf, (size_t)rc);
3221 
3222  if (os) {
3223  AutoUnlocker al(l);
3224  os->write(buf, rc, xsink);
3225  if (*xsink)
3226  return nullptr;
3227  } else {
3228  b->append(buf, rc);
3229  }
3230  br += rc;
3231 
3232  if (br >= size)
3233  break;
3234  if (size - br < bs)
3235  bs = size - br;
3236  }
3237 
3238  // DEBUG
3239  //printd(5, "QoreSocket::readHTTPChunkedBodyBinary(): received binary chunk: size: %d br=" QSD " total=" QSD "\n", size, br, b->size());
3240 
3241  // read crlf after chunk
3242  // FIXME: bytes read are not checked if they equal CRLF
3243  br = 0;
3244  while (br < 2) {
3245  char* buf;
3246  rc = brecv(xsink, "readHTTPChunkedBodyBinary", buf, 2 - br, 0, timeout, false);
3247  if (rc <= 0) {
3248  if (!*xsink) {
3249  assert(!rc);
3250  se_closed(cname, "readHTTPChunkedBodyBinary", xsink);
3251  }
3252  return nullptr;
3253  }
3254  br += rc;
3255  }
3256 
3257  do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3258 
3259  if (recv_callback && !os) {
3260  if (runDataCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, *b, true))
3261  return nullptr;
3262  if (b)
3263  b->clear();
3264  }
3265 
3266  // ensure string is blanked for next read
3267  str.clear();
3268  }
3269 
3270  // read footers or nothing
3271  QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBodyBinary", timeout, rc, true));
3272  if (*xsink)
3273  return nullptr;
3274 
3275  ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3276  if (!recv_callback && !os) {
3277  h->setKeyValue("body", b.release(), xsink);
3278  }
3279 
3280  ReferenceHolder<QoreHashNode> info(xsink);
3281 
3282  if (hdr) {
3283  if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3284  return recv_callback ? 0 : h.release();
3285 
3286  if (recv_callback) {
3287  info = new QoreHashNode(autoTypeInfo);
3288  }
3289  convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3290  do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3291  }
3292 
3293  if (recv_callback) {
3294  runHeaderCallback(xsink, cname, "readHTTPChunkedBodyBinary", *recv_callback, l, h->empty() ? nullptr : *h,
3295  info.release(), false, obj);
3296  return 0;
3297  }
3298 
3299  return h.release();
3300  }
3301 
3302  // receive a message in HTTP chunked format
3303  DLLLOCAL QoreHashNode* readHttpChunkedBody(int timeout, ExceptionSink* xsink, const char* cname, int source, const ResolvedCallReferenceNode* recv_callback = 0, QoreThreadLock* l = 0, QoreObject* obj = 0) {
3304  assert(xsink);
3305 
3306  if (sock == QORE_INVALID_SOCKET) {
3307  se_not_open(cname, "readHTTPChunkedBody", xsink);
3308  return 0;
3309  }
3310  if (in_op >= 0) {
3311  if (in_op == q_gettid()) {
3312  se_in_op(cname, "readHTTPChunkedBody", xsink);
3313  return 0;
3314  }
3315  se_in_op_thread(cname, "readHTTPChunkedBody", xsink);
3316  return 0;
3317  }
3318 
3319  // reset "expecting HTTP chunked body" flag
3320  if (http_exp_chunked_body)
3321  http_exp_chunked_body = false;
3322 
3323  qore_socket_op_helper oh(this);
3324 
3325  QoreStringNodeHolder buf(new QoreStringNode(enc));
3326  QoreString str; // for reading the size of each chunk
3327 
3328  qore_offset_t rc;
3329  // read the size then read the data and append to buf
3330  while (true) {
3331  // state = 0, nothing
3332  // state = 1, \r received
3333  int state = 0;
3334  while (true) {
3335  char* tbuf;
3336  rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 1, 0, timeout, false);
3337  if (rc <= 0) {
3338  if (!*xsink) {
3339  assert(!rc);
3340  se_closed(cname, "readHTTPChunkedBody", xsink);
3341  }
3342  return 0;
3343  }
3344 
3345  char c = tbuf[0];
3346 
3347  if (!state && c == '\r')
3348  state = 1;
3349  else if (state && c == '\n')
3350  break;
3351  else {
3352  if (state) {
3353  state = 0;
3354  str.concat('\r');
3355  }
3356  str.concat(c);
3357  }
3358  }
3359  // DEBUG
3360  //printd(5, "got chunk size (" QSD " bytes) string: %s\n", str.strlen(), str.getBuffer());
3361 
3362  // terminate string at ';' char if present
3363  char* p = (char*)strchr(str.getBuffer(), ';');
3364  if (p)
3365  *p = '\0';
3366  qore_offset_t size = strtol(str.getBuffer(), 0, 16);
3367  do_chunked_read(QORE_EVENT_HTTP_CHUNK_SIZE, size, str.strlen(), source);
3368 
3369  if (!size)
3370  break;
3371 
3372  if (size < 0) {
3373  xsink->raiseException("READ-HTTP-CHUNK-ERROR", "negative value given for chunk size (%ld)", size);
3374  return 0;
3375  }
3376  // ensure string is blanked for next read
3377  str.clear();
3378 
3379  // prepare string for chunk
3380  //buf->allocate((unsigned)(buf->strlen() + size + 1));
3381 
3382  // read chunk directly into string buffer
3383  qore_offset_t bs = size < DEFAULT_SOCKET_BUFSIZE ? size : DEFAULT_SOCKET_BUFSIZE;
3384  qore_offset_t br = 0; // bytes received
3385  str.clear();
3386  while (true) {
3387  char* tbuf;
3388  rc = brecv(xsink, "readHTTPChunkedBody", tbuf, bs, 0, timeout, false);
3389  if (rc <= 0) {
3390  if (!*xsink) {
3391  assert(!rc);
3392  se_closed(cname, "readHTTPChunkedBody", xsink);
3393  }
3394  return 0;
3395  }
3396 
3397  br += rc;
3398  buf->concat(tbuf, rc);
3399 
3400  do_data_event(QORE_EVENT_HTTP_CHUNKED_DATA_READ, source, tbuf, (size_t)rc);
3401 
3402  if (br >= size)
3403  break;
3404  if (size - br < bs)
3405  bs = size - br;
3406  }
3407 
3408  // DEBUG
3409  //printd(5, "got chunk (" QSD " bytes): %s\n", br, buf->getBuffer() + buf->strlen() - size);
3410 
3411  // read crlf after chunk
3412  // FIXME: bytes read are not checked if they equal CRLF
3413  br = 0;
3414  while (br < 2) {
3415  char* tbuf;
3416  rc = brecv(xsink, "readHTTPChunkedBody", tbuf, 2 - br, 0, timeout, false);
3417  if (rc <= 0) {
3418  if (!*xsink) {
3419  assert(!rc);
3420  se_closed(cname, "readHTTPChunkedBody", xsink);
3421  }
3422  return nullptr;
3423  }
3424  br += rc;
3425  }
3426 
3427  do_chunked_read(QORE_EVENT_HTTP_CHUNKED_DATA_RECEIVED, size, size + 2, source);
3428 
3429  if (recv_callback) {
3430  if (runDataCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, *buf, true))
3431  return nullptr;
3432  buf->clear();
3433  }
3434  }
3435 
3436  // read footers or nothing
3437  QoreStringNodeHolder hdr(readHTTPData(xsink, "readHTTPChunkedBody", timeout, rc, true));
3438  if (*xsink)
3439  return nullptr;
3440 
3441  //printd(5, "chunked body encoding: %s\n", buf->getEncoding()->getCode());
3442  ReferenceHolder<QoreHashNode> h(new QoreHashNode(autoTypeInfo), xsink);
3443  if (!recv_callback) {
3444  h->setKeyValue("body", buf.release(), xsink);
3445  }
3446 
3447  ReferenceHolder<QoreHashNode> info(xsink);
3448 
3449  if (hdr) {
3450  if (hdr->strlen() >= 2 && hdr->strlen() <= 4)
3451  return recv_callback ? 0 : h.release();
3452 
3453  if (recv_callback) {
3454  info = new QoreHashNode(autoTypeInfo);
3455  }
3456  convertHeaderToHash(*h, (char*)hdr->c_str(), 0, *info, nullptr, "response-headers-raw");
3457  do_read_http_header(QORE_EVENT_HTTP_FOOTERS_RECEIVED, *h, source);
3458  }
3459 
3460  if (recv_callback) {
3461  runHeaderCallback(xsink, cname, "readHTTPChunkedBody", *recv_callback, l, h->empty() ? nullptr : *h,
3462  info.release(), false, obj);
3463  return 0;
3464  }
3465 
3466  return h.release();
3467  }
3468 
3469  DLLLOCAL static void do_accept_encoding(char* t, QoreHashNode& info) {
3470  ReferenceHolder<QoreListNode> l(new QoreListNode(autoTypeInfo), 0);
3471 
3472  char* a = t;
3473  bool ok = true;
3474  while (*a) {
3475  if (ok) {
3476  ok = false;
3478  while (*a && *a != ';' && *a != ',')
3479  str->concat(*(a++));
3480  str->trim();
3481  if (!str->empty())
3482  l->push(str.release(), nullptr);
3483  continue;
3484  }
3485  else if (*a == ',')
3486  ok = true;
3487 
3488  ++a;
3489  }
3490 
3491  if (!l->empty())
3492  info.setKeyValue("accept-encoding", l.release(), 0);
3493  }
3494 
3495  DLLLOCAL bool do_accept_charset(char* t, QoreHashNode& info) {
3496  bool acceptcharset = false;
3497 
3498  // see if we have "*" or utf8 or utf-8, in which case set it
3499  // otherwise set the first charset in the list
3500  char* a = t;
3501  char* div = 0;
3502  bool utf8 = false;
3503  bool ok = true;
3504  while (*a) {
3505  if (ok) {
3506  if (*a == '*') {
3507  utf8 = true;
3508  break;
3509  }
3510  ok = false;
3511  if (*a == 'u' || *a == 'U') {
3512  ++a;
3513  if (*a == 't' || *a == 'T') {
3514  ++a;
3515  if (*a == 'f' || *a == 'F') {
3516  ++a;
3517  if (*a == '-')
3518  ++a;
3519  if (*a == '8') {
3520  utf8 = true;
3521  break;
3522  }
3523  }
3524  }
3525  continue;
3526  }
3527  } else if (*a == ',') {
3528  if (!div)
3529  div = a;
3530  ok = true;
3531  } else if (*a == ';') {
3532  if (!div)
3533  div = a;
3534  }
3535 
3536  ++a;
3537  }
3538  if (utf8) {
3539  info.setKeyValue("accept-charset", new QoreStringNode("utf8"), 0);
3540  acceptcharset = true;
3541  } else {
3543  if (div)
3544  ac->concat(t, div - t);
3545  else
3546  ac->concat(t);
3547  ac->trim();
3548  if (!ac->empty()) {
3549  info.setKeyValue("accept-charset", ac.release(), 0);
3550  acceptcharset = true;
3551  }
3552  }
3553 
3554  return acceptcharset;
3555  }
3556 
3557  // returns true if the connection should be closed, false if not
3558  DLLLOCAL bool convertHeaderToHash(QoreHashNode* h, char* p, int flags = 0, QoreHashNode* info = nullptr,
3559  bool* chunked = nullptr, const char* headers_raw_key = "headers-raw") {
3560  bool close = !(flags & CHF_HTTP11);
3561  // socket encoding
3562  const char* senc = nullptr;
3563  // accept-charset
3564  bool acceptcharset = false;
3565 
3566  QoreHashNode* raw_hdr = nullptr;
3567  if (info) {
3568  info->setKeyValue(headers_raw_key, raw_hdr = new QoreHashNode(autoTypeInfo), nullptr);
3569  }
3570 
3571  // raw key for setting raw headers
3572  std::string raw_key;
3573 
3574  while (*p) {
3575  char* buf = p;
3576 
3577  if ((p = strstr(buf, "\r\n"))) {
3578  *p = '\0';
3579  p += 2;
3580  } else if ((p = strchr(buf, '\n'))) {
3581  *p = '\0';
3582  p++;
3583  } else if ((p = strchr(buf, '\r'))) {
3584  *p = '\0';
3585  p++;
3586  } else
3587  break;
3588  char* t = strchr(buf, ':');
3589  if (!t)
3590  break;
3591  *t = '\0';
3592  t++;
3593  while (t && qore_isblank(*t))
3594  t++;
3595  if (raw_hdr) {
3596  raw_key = buf;
3597  }
3598  strtolower(buf);
3599  //printd(5, "setting %s = '%s'\n", buf, t);
3600 
3601  ReferenceHolder<> val(new QoreStringNode(t), nullptr);
3602 
3603  if (flags & CHF_PROCESS) {
3604  if (!strcmp(buf, "connection")) {
3605  if (flags & CHF_HTTP11) {
3606  if (strcasestr(t, "close"))
3607  close = true;
3608  } else {
3609  if (strcasestr(t, "keep-alive"))
3610  close = false;
3611  }
3612  } else if (!strcmp(buf, "content-type")) {
3613  char* a = strcasestr(t, "charset=");
3614  if (a) {
3615  // find end
3616  char* e = strchr(a + 8, ';');
3617 
3618  QoreString cs;
3619  if (e)
3620  cs.concat(a + 8, e - a - 8);
3621  else
3622  cs.concat(a + 8);
3623  cs.trim();
3624  senc = cs.getBuffer();
3625  //printd(5, "got encoding '%s' from request\n", senc);
3626  enc = QEM.findCreate(senc);
3627 
3628  if (info) {
3629  size_t len = cs.size();
3630  info->setKeyValue("charset", new QoreStringNode(cs.giveBuffer(), len, len + 1, QCS_DEFAULT), nullptr);
3631  }
3632 
3633  if (info) {
3635  // remove any whitespace and ';' before charset=
3636  if (a != t) {
3637  do {
3638  --a;
3639  } while (a > t && (*a == ' ' || *a == ';'));
3640  }
3641 
3642  if (a == t) {
3643  if (e)
3644  ct->concat(e + 1);
3645  } else {
3646  ct->concat(t, a - t + 1);
3647  if (e)
3648  ct->concat(e);
3649  }
3650  ct->trim();
3651  if (!ct->empty())
3652  info->setKeyValue("body-content-type", ct.release(), nullptr);
3653  }
3654  } else {
3655  enc = QEM.findCreate(assume_http_encoding.c_str());
3656  if (info) {
3657  info->setKeyValue("charset", new QoreStringNode(assume_http_encoding), nullptr);
3658  info->setKeyValue("body-content-type", val->refSelf(), nullptr);
3659  }
3660  }
3661  } else if (chunked && !strcmp(buf, "transfer-encoding") && !strcasecmp(t, "chunked")) {
3662  *chunked = true;
3663  } else if (info) {
3664  if (!strcmp(buf, "accept-charset"))
3665  acceptcharset = do_accept_charset(t, *info);
3666  else if ((flags & CHF_REQUEST) && !strcmp(buf, "accept-encoding"))
3667  do_accept_encoding(t, *info);
3668  }
3669  }
3670 
3671  ReferenceHolder<> val_copy(nullptr);
3672  if (raw_hdr && val) {
3673  val_copy = val->realCopy();
3674  }
3675 
3676  // see if header exists, and if so make it a list and add value to the list
3677  hash_assignment_priv ha(*h, buf);
3678  if (!(*ha).isNothing()) {
3679  QoreListNode* l;
3680  if ((*ha).getType() == NT_LIST) {
3681  l = (*ha).get<QoreListNode>();
3682  } else {
3683  l = new QoreListNode(autoTypeInfo);
3684  l->push(ha.swap(l), nullptr);
3685  }
3686  l->push(val.release(), nullptr);
3687  } else // otherwise set header normally
3688  ha.assign(val.release(), 0);
3689 
3690  // set raw headers if applicable
3691  if (raw_hdr) {
3692  hash_assignment_priv ha(*raw_hdr, raw_key);
3693  if (!(*ha).isNothing()) {
3694  QoreListNode* l;
3695  if ((*ha).getType() == NT_LIST) {
3696  l = (*ha).get<QoreListNode>();
3697  } else {
3698  l = new QoreListNode(autoTypeInfo);
3699  l->push(ha.swap(l), nullptr);
3700  }
3701  l->push(val_copy.release(), nullptr);
3702  } else // otherwise set header normally
3703  ha.assign(val_copy.release(), nullptr);
3704  }
3705  }
3706 
3707  if ((flags & CHF_PROCESS)) {
3708  if (!senc)
3709  enc = QEM.findCreate(assume_http_encoding.c_str());
3710  // according to RFC-2616 section 14.2, "If no Accept-Charset header is present, the default is that any character set is acceptable" so we will use utf-8
3711  if (info && !acceptcharset)
3712  info->setKeyValue("accept-charset", new QoreStringNode("utf8"), nullptr);
3713  }
3714 
3715  return close;
3716  }
3717 
3718  DLLLOCAL int recvix(const char* meth, int len, void* targ, int timeout_ms, ExceptionSink* xsink) {
3719  assert(xsink);
3720  if (sock == QORE_INVALID_SOCKET) {
3721  se_not_open("Socket", meth, xsink, "recvix");
3722  return QSE_NOT_OPEN;
3723  }
3724  if (in_op >= 0) {
3725  if (in_op == q_gettid()) {
3726  se_in_op("Socket", meth, xsink);
3727  return 0;
3728  }
3729  se_in_op_thread("Socket", meth, xsink);
3730  return 0;
3731  }
3732 
3733  PrivateQoreSocketThroughputHelper th(this, false);
3734 
3735  char* buf;
3736  qore_offset_t br = 0;
3737  while (true) {
3738  qore_offset_t rc = brecv(xsink, meth, buf, len - br, 0, timeout_ms);
3739  if (rc <= 0) {
3740  do_read_error(rc, meth, timeout_ms, xsink);
3741  return (int)rc;
3742  }
3743 
3744  memcpy(targ, buf, rc);
3745 
3746  br += rc;
3747  if (br >= len)
3748  break;
3749  }
3750 
3751  th.finalize(br);
3752  do_data_event(QORE_EVENT_SOCKET_DATA_READ, QORE_SOURCE_SOCKET, targ, br);
3753  return (int)br;
3754  }
3755 
3756  DLLLOCAL void clearWarningQueue(ExceptionSink* xsink) {
3757  if (warn_queue) {
3758  if (warn_callback_arg) {
3759  warn_callback_arg.discard(xsink);
3760  warn_callback_arg = QoreValue();
3761  }
3762  warn_queue->deref(xsink);
3763  warn_queue = nullptr;
3764  tl_warning_us = 0;
3765  tp_warning_bs = 0.0;
3766  tp_us_min = 0;
3767  }
3768  }
3769 
3770  DLLLOCAL void setWarningQueue(ExceptionSink* xsink, int64 warning_ms, int64 warning_bs, Queue* wq, QoreValue arg, int64 min_ms = 1000) {
3771  ReferenceHolder<Queue> qholder(wq, xsink);
3772  ValueHolder holder(arg, xsink);
3773  if (warning_ms <= 0 && warning_bs <= 0) {
3774  xsink->raiseException("SOCKET-SETWARNINGQUEUE-ERROR", "Socket::setWarningQueue() at least one of warning ms argument: " QLLD " and warning B/s argument: " QLLD " must be greater than zero; to clear, call Socket::clearWarningQueue() with no arguments", warning_ms, warning_bs);
3775  return;
3776  }
3777 
3778  if (warning_ms < 0)
3779  warning_ms = 0;
3780  if (warning_bs < 0)
3781  warning_bs = 0;
3782 
3783  if (warn_queue) {
3784  warn_queue->deref(xsink);
3785  warn_callback_arg.discard(xsink);
3786  }
3787 
3788  warn_queue = qholder.release();
3789  warn_callback_arg = holder.release();
3790  tl_warning_us = (int64)warning_ms * 1000;
3791  tp_warning_bs = warning_bs;
3792  tp_us_min = min_ms * 1000;
3793  }
3794 
3795  DLLLOCAL void getUsageInfo(QoreHashNode& h, qore_socket_private& s) const {
3796  if (warn_queue) {
3797  h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3798  h.setKeyValue("timeout", tl_warning_us, 0);
3799  h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3800  h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3801  }
3802 
3803  h.setKeyValue("bytes_sent", tp_bytes_sent + s.tp_bytes_sent, 0);
3804  h.setKeyValue("bytes_recv", tp_bytes_recv + s.tp_bytes_sent, 0);
3805  h.setKeyValue("us_sent", tp_us_sent + s.tp_us_sent, 0);
3806  h.setKeyValue("us_recv", tp_us_recv + s.tp_us_recv, 0);
3807  }
3808 
3809  DLLLOCAL void getUsageInfo(QoreHashNode& h) const {
3810  if (warn_queue) {
3811  h.setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3812  h.setKeyValue("timeout", tl_warning_us, 0);
3813  h.setKeyValue("min_throughput", (int64)tp_warning_bs, 0);
3814  h.setKeyValue("min_throughput_us", (int64)tp_us_min, 0);
3815  }
3816 
3817  h.setKeyValue("bytes_sent", tp_bytes_sent, 0);
3818  h.setKeyValue("bytes_recv", tp_bytes_recv, 0);
3819  h.setKeyValue("us_sent", tp_us_sent, 0);
3820  h.setKeyValue("us_recv", tp_us_recv, 0);
3821  }
3822 
3823  DLLLOCAL QoreHashNode* getUsageInfo() const {
3824  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3825  getUsageInfo(*h);
3826  return h;
3827  }
3828 
3829  DLLLOCAL void clearStats() {
3830  tp_bytes_sent = 0;
3831  tp_bytes_recv = 0;
3832  tp_us_sent = 0;
3833  tp_us_recv = 0;
3834  }
3835 
3836  DLLLOCAL void doTimeoutWarning(const char* op, int64 dt) {
3837  assert(warn_queue);
3838  assert(dt > tl_warning_us);
3839 
3840  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3841 
3842  h->setKeyValue("type", new QoreStringNode("SOCKET-OPERATION-WARNING"), 0);
3843  h->setKeyValue("operation", new QoreStringNode(op), 0);
3844  h->setKeyValue("us", dt, 0);
3845  h->setKeyValue("timeout", tl_warning_us, 0);
3846  if (warn_callback_arg)
3847  h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3848 
3849  warn_queue->pushAndTakeRef(h);
3850  }
3851 
3852  DLLLOCAL void doThroughputWarning(bool send, int64 bytes, int64 dt, double bs) {
3853  assert(warn_queue);
3854  assert(bs < tp_warning_bs);
3855 
3856  QoreHashNode* h = new QoreHashNode(autoTypeInfo);
3857 
3858  h->setKeyValue("type", new QoreStringNode("SOCKET-THROUGHPUT-WARNING"), 0);
3859  h->setKeyValue("dir", new QoreStringNode(send ? "send" : "recv"), 0);
3860  h->setKeyValue("bytes", bytes, 0);
3861  h->setKeyValue("us", dt, 0);
3862  h->setKeyValue("bytes_sec", bs, 0);
3863  h->setKeyValue("threshold", (int64)tp_warning_bs, 0);
3864  if (warn_callback_arg)
3865  h->setKeyValue("arg", warn_callback_arg.refSelf(), 0);
3866 
3867  warn_queue->pushAndTakeRef(h);
3868  }
3869 
3870  DLLLOCAL bool pendingHttpChunkedBody() const {
3871  return http_exp_chunked_body && sock != QORE_INVALID_SOCKET;
3872  }
3873 
3874  DLLLOCAL void setSslVerifyMode(int mode) {
3875  //printd(5, "qore_socket_private::setSslVerifyMode() this: %p mode: %d\n", this, mode);
3876  ssl_verify_mode = mode;
3877  if (ssl)
3878  ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
3879  }
3880 
3881  DLLLOCAL void acceptAllCertificates(bool accept_all = true) {
3882  ssl_accept_all_certs = accept_all;
3883  if (ssl)
3884  ssl->setVerifyMode(ssl_verify_mode, ssl_accept_all_certs, client_target);
3885  }
3886 
3887  DLLLOCAL void setSslErrorString(QoreStringNode* err_str) {
3888  if (ssl_err_str) {
3889  ssl_err_str->concat("; ");
3890  ssl_err_str->concat(err_str);
3891  err_str->deref();
3892  } else {
3893  ssl_err_str = err_str;
3894  }
3895  }
3896 
3897  DLLLOCAL static void getUsageInfo(const QoreSocket& sock, QoreHashNode& h, const QoreSocket& s) {
3898  sock.priv->getUsageInfo(h, *s.priv);
3899  }
3900 
3901  DLLLOCAL static qore_socket_private* get(QoreSocket& sock) {
3902  return sock.priv;
3903  }
3904 
3905  DLLLOCAL static const qore_socket_private* get(const QoreSocket& sock) {
3906  return sock.priv;
3907  }
3908 
3909  DLLLOCAL static void captureRemoteCert(X509_STORE_CTX* x509_ctx);
3910 
3911  DLLLOCAL static QoreListNode* poll(const QoreListNode* poll_list, int timeout_ms, ExceptionSink* xsink);
3912 };
3913 
3914 #endif
DLLEXPORT const QoreEncoding * QCS_DEFAULT
the default encoding for the Qore library
DLLEXPORT QoreEncodingManager QEM
the QoreEncodingManager object
#define QORE_MIN(a, b)
macro to return the minimum of 2 numbers
Definition: QoreLib.h:550
DLLEXPORT QoreStringNode * q_strerror(int errnum)
returns the error string as a QoreStringNode
static void strtolower(char *str)
convert a string to lower-case in place
Definition: QoreLib.h:268
The base class for all value and parse types in Qore expression trees.
Definition: AbstractQoreNode.h:57
DLLEXPORT AbstractQoreNode * refSelf() const
returns "this" with an incremented reference count
virtual DLLEXPORT AbstractQoreNode * realCopy() const =0
returns a copy of the object; the caller owns the reference count
DLLEXPORT void deref(ExceptionSink *xsink)
decrements the reference count and calls derefImpl() if there_can_be_only_one is false,...
provides a safe and exception-safe way to release and re-acquire locks in Qore, only to be used on th...
Definition: QoreThreadLock.h:190
holds arbitrary binary data
Definition: BinaryNode.h:41
DLLEXPORT void append(const void *nptr, size_t size)
resizes the object and appends a copy of the data passed to the object
DLLEXPORT size_t size() const
returns the number of bytes in the object
DLLEXPORT bool empty() const
returns true if empty
DLLEXPORT void clear()
frees any managed memory and sets the size to 0
DLLEXPORT const void * getPtr() const
returns the pointer to the data
constant iterator class for QoreHashNode, to be only created on the stack
Definition: QoreHashNode.h:577
For use on the stack only: iterates through elements of a const QoreListNode.
Definition: QoreListNode.h:563
container for holding Qore-language exception information and also for registering a "thread_exit" ca...
Definition: ExceptionSink.h:48
DLLEXPORT AbstractQoreNode * raiseErrnoException(const char *err, int en, const char *fmt,...)
appends a Qore-language exception to the list and appends the result of strerror(errno) to the descri...
DLLEXPORT AbstractQoreNode * raiseExceptionArg(const char *err, QoreValue arg, const char *fmt,...)
appends a Qore-language exception to the list, and sets the 'arg' member (this object takes over the ...
DLLEXPORT AbstractQoreNode * raiseException(const char *err, const char *fmt,...)
appends a Qore-language exception to the list
Interface for private data of input streams.
Definition: InputStream.h:44
virtual int64 read(void *ptr, int64 limit, ExceptionSink *xsink)=0
Reads up to `limit` bytes from the input stream.
Interface for private data of output streams.
Definition: OutputStream.h:44
virtual void write(const void *ptr, int64 count, ExceptionSink *xsink)=0
Writes bytes to the output stream.
provides an interface to getaddrinfo
Definition: QoreNet.h:132
DLLLOCAL hashdecl addrinfo * getAddrInfo() const
returns the hashdecl addrinfo * being managed (may by 0)
Definition: QoreNet.h:159
DLLEXPORT int getInfo(ExceptionSink *xsink, const char *node, const char *service, int family=Q_AF_UNSPEC, int flags=0, int socktype=Q_SOCK_STREAM, int protocol=0)
get address info with the given parameters, if any errors occur, a Qore-language exception is thrown
static DLLEXPORT const char * getFamilyName(int address_family)
returns the name of the address family as a string (ie AF_INET = "ipv4", etc)
static DLLEXPORT QoreStringNode * getAddressDesc(int address_family, const char *addr)
returns a descriptive string for the address family and an address string (ie AF_INET6,...
defines string encoding functions in Qore
Definition: QoreEncoding.h:83
static DLLEXPORT const QoreEncoding * findCreate(const char *name)
finds an encoding if it exists (also looks up against alias names) and creates a new one if it doesn'...
This is the hash or associative list container type in Qore, dynamically allocated only,...
Definition: QoreHashNode.h:50
DLLEXPORT int setKeyValue(const char *key, QoreValue value, ExceptionSink *xsink)
sets the value of "key" to "value"
DLLEXPORT size_t size() const
returns the number of members in the hash, executes in constant time
DLLEXPORT bool empty() const
returns true if the hash has no members, false if not
DLLEXPORT QoreHashNode * copy() const
performs a copy of the hash and returns the new hash
DLLEXPORT QoreHashNode * hashRefSelf() const
returns "this" with an incremented reference count
This is the list container type in Qore, dynamically allocated only, reference counted.
Definition: QoreListNode.h:52
DLLEXPORT int push(QoreValue val, ExceptionSink *xsink)
adds a value to the list
Qore's arbitrary-precision number value type, dynamically-allocated only, reference counted.
Definition: QoreNumberNode.h:51
the implementation of Qore's object data type, reference counted, dynamically-allocated only
Definition: QoreObject.h:60
DLLEXPORT AbstractPrivateData * getReferencedPrivateData(qore_classid_t key, ExceptionSink *xsink) const
returns the private data corresponding to the class ID passed with an incremented reference count,...
DLLEXPORT void setValue(const char *key, QoreValue val, ExceptionSink *xsink)
sets the value of the given member to the given value
provides access to sockets using Qore data structures
Definition: QoreSocket.h:127
Qore's string type supported by the QoreEncoding class.
Definition: QoreString.h:93
DLLEXPORT void set(const char *str, const QoreEncoding *new_qorecharset=QCS_DEFAULT)
copies the c-string passed and sets the value of the string and its encoding
DLLEXPORT size_t strlen() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT void clear()
reset string to zero length; memory is not deallocated; string encoding does not change
DLLEXPORT void concat(const QoreString *str, ExceptionSink *xsink)
concatenates a string and converts encodings if necessary
DLLEXPORT int sprintf(const char *fmt,...)
this will concatentate a formatted string to the existing string according to the format string and t...
DLLEXPORT size_t size() const
returns number of bytes in the string (not including the null pointer)
DLLEXPORT char * giveBuffer()
returns the character buffer and leaves the QoreString empty, the caller owns the memory returned (mu...
DLLEXPORT void trim(const char *chars=0)
remove leading and trailing whitespace or other characters
DLLEXPORT bool empty() const
returns true if the string is empty, false if not
DLLEXPORT const char * c_str() const
returns the string's buffer; this data should not be changed
DLLEXPORT const char * getBuffer() const
returns the string's buffer; this data should not be changed
Qore's string value type, reference counted, dynamically-allocated only.
Definition: QoreStringNode.h:50
provides a mutually-exclusive thread lock
Definition: QoreThreadLock.h:49
DLLLOCAL T * release()
releases the pointer to the caller
Definition: ReferenceHolder.h:83
base class for resolved call references
Definition: CallReferenceNode.h:109
virtual DLLLOCAL QoreValue execValue(const QoreListNode *args, ExceptionSink *xsink) const =0
pure virtual function for executing the function reference
a helper class for getting socket origination information
Definition: QoreSocket.h:74
holds an object and dereferences it in the destructor
Definition: QoreValue.h:476
unsigned qore_classid_t
used for the unique class ID for QoreClass objects
Definition: common.h:79
intptr_t qore_offset_t
used for offsets that could be negative
Definition: common.h:76
long long int64
64bit integer type, cannot use int64_t here since it breaks the API on some 64-bit systems due to equ...
Definition: common.h:260
const qore_type_t NT_BOOLEAN
type value for bools (QoreValue only)
Definition: node_types.h:47
const qore_type_t NT_NUMBER
type value for QoreNumberNode
Definition: node_types.h:53
const qore_type_t NT_BINARY
type value for BinaryNode
Definition: node_types.h:49
const qore_type_t NT_LIST
type value for QoreListNode
Definition: node_types.h:50
const qore_type_t NT_NULL
type value for QoreNullNode
Definition: node_types.h:48
const qore_type_t NT_INT
type value for integers (QoreValue only)
Definition: node_types.h:43
const qore_type_t NT_STRING
type value for QoreStringNode
Definition: node_types.h:45
const qore_type_t NT_FLOAT
type value for floating-point values (QoreValue only)
Definition: node_types.h:44
const qore_type_t NT_HASH
type value for QoreHashNode
Definition: node_types.h:51
const qore_type_t NT_NOTHING
type value for QoreNothingNode
Definition: node_types.h:42
DLLEXPORT int q_gettid() noexcept
returns the current TID number
The main value class in Qore, designed to be passed by value.
Definition: QoreValue.h:275
DLLEXPORT void discard(ExceptionSink *xsink)
dereferences any contained AbstractQoreNode pointer and sets to 0; does not modify other values
DLLEXPORT QoreValue refSelf() const
references the contained value if type == QV_Node, returns itself