00001
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032 #ifdef _WIN32
00033 #ifdef _MSC_VER
00034 #pragma warning(disable:4786)
00035 #endif
00036 #endif
00037 #include <stdlib.h>
00038 #include <errno.h>
00039 #include <stdio.h>
00040
00041 #include "SocketHandler.h"
00042 #include "UdpSocket.h"
00043 #include "ResolvSocket.h"
00044 #include "ResolvServer.h"
00045 #include "TcpSocket.h"
00046 #include "IMutex.h"
00047 #include "Utility.h"
00048 #include "SocketAddress.h"
00049 #include "Exception.h"
00050 #include "SocketHandlerThread.h"
00051 #include "Lock.h"
00052
00053 #ifdef SOCKETS_NAMESPACE
00054 namespace SOCKETS_NAMESPACE {
00055 #endif
00056
00057 #ifdef _DEBUG
00058 #define DEB(x) x; fflush(stderr);
00059 #else
00060 #define DEB(x)
00061 #endif
00062
00063
00064 SocketHandler::SocketHandler(StdLog *p)
00065 :m_stdlog(p)
00066 ,m_mutex(m_mutex)
00067 ,m_b_use_mutex(false)
00068 ,m_parent(m_parent)
00069 ,m_b_parent_is_valid(false)
00070 ,m_release(NULL)
00071 ,m_maxsock(0)
00072 ,m_tlast(0)
00073 ,m_b_check_callonconnect(false)
00074 ,m_b_check_detach(false)
00075 ,m_b_check_timeout(false)
00076 ,m_b_check_retry(false)
00077 ,m_b_check_close(false)
00078 #ifdef ENABLE_SOCKS4
00079 ,m_socks4_host(0)
00080 ,m_socks4_port(0)
00081 ,m_bTryDirect(false)
00082 #endif
00083 #ifdef ENABLE_RESOLVER
00084 ,m_resolv_id(0)
00085 ,m_resolver(NULL)
00086 #endif
00087 #ifdef ENABLE_POOL
00088 ,m_b_enable_pool(false)
00089 #endif
00090 #ifdef ENABLE_DETACH
00091 ,m_slave(false)
00092 #endif
00093 {
00094 FD_ZERO(&m_rfds);
00095 FD_ZERO(&m_wfds);
00096 FD_ZERO(&m_efds);
00097 }
00098
00099
00100 SocketHandler::SocketHandler(IMutex& mutex,StdLog *p)
00101 :m_stdlog(p)
00102 ,m_mutex(mutex)
00103 ,m_b_use_mutex(true)
00104 ,m_parent(m_parent)
00105 ,m_b_parent_is_valid(false)
00106 ,m_release(NULL)
00107 ,m_maxsock(0)
00108 ,m_tlast(0)
00109 ,m_b_check_callonconnect(false)
00110 ,m_b_check_detach(false)
00111 ,m_b_check_timeout(false)
00112 ,m_b_check_retry(false)
00113 ,m_b_check_close(false)
00114 #ifdef ENABLE_SOCKS4
00115 ,m_socks4_host(0)
00116 ,m_socks4_port(0)
00117 ,m_bTryDirect(false)
00118 #endif
00119 #ifdef ENABLE_RESOLVER
00120 ,m_resolv_id(0)
00121 ,m_resolver(NULL)
00122 #endif
00123 #ifdef ENABLE_POOL
00124 ,m_b_enable_pool(false)
00125 #endif
00126 #ifdef ENABLE_DETACH
00127 ,m_slave(false)
00128 #endif
00129 {
00130 m_mutex.Lock();
00131 FD_ZERO(&m_rfds);
00132 FD_ZERO(&m_wfds);
00133 FD_ZERO(&m_efds);
00134 }
00135
00136
00137 SocketHandler::SocketHandler(IMutex& mutex, ISocketHandler& parent, StdLog *p)
00138 :m_stdlog(p)
00139 ,m_mutex(mutex)
00140 ,m_b_use_mutex(true)
00141 ,m_parent(parent)
00142 ,m_b_parent_is_valid(true)
00143 ,m_release(NULL)
00144 ,m_maxsock(0)
00145 ,m_tlast(0)
00146 ,m_b_check_callonconnect(false)
00147 ,m_b_check_detach(false)
00148 ,m_b_check_timeout(false)
00149 ,m_b_check_retry(false)
00150 ,m_b_check_close(false)
00151 #ifdef ENABLE_SOCKS4
00152 ,m_socks4_host(0)
00153 ,m_socks4_port(0)
00154 ,m_bTryDirect(false)
00155 #endif
00156 #ifdef ENABLE_RESOLVER
00157 ,m_resolv_id(0)
00158 ,m_resolver(NULL)
00159 #endif
00160 #ifdef ENABLE_POOL
00161 ,m_b_enable_pool(false)
00162 #endif
00163 #ifdef ENABLE_DETACH
00164 ,m_slave(false)
00165 #endif
00166 {
00167 m_mutex.Lock();
00168 FD_ZERO(&m_rfds);
00169 FD_ZERO(&m_wfds);
00170 FD_ZERO(&m_efds);
00171 }
00172
00173
00174 SocketHandler::~SocketHandler()
00175 {
00176 for (std::list<SocketHandlerThread *>::iterator it = m_threads.begin(); it != m_threads.end(); ++it)
00177 {
00178 SocketHandlerThread *p = *it;
00179 p -> Stop();
00180 }
00181 #ifdef ENABLE_RESOLVER
00182 if (m_resolver)
00183 {
00184 m_resolver -> Quit();
00185 }
00186 #endif
00187 {
00188 while (m_sockets.size())
00189 {
00190 DEB( fprintf(stderr, "Emptying sockets list in SocketHandler destructor, %d instances\n", (int)m_sockets.size());)
00191 socket_m::iterator it = m_sockets.begin();
00192 Socket *p = it -> second;
00193 if (p)
00194 {
00195 DEB( fprintf(stderr, " fd %d\n", p -> GetSocket());)
00196 p -> Close();
00197 DEB( fprintf(stderr, " fd closed %d\n", p -> GetSocket());)
00198
00199
00200
00201
00202
00203
00204
00205 if (p -> DeleteByHandler()
00206 #ifdef ENABLE_DETACH
00207 && !(m_slave ^ p -> IsDetached())
00208 #endif
00209 )
00210 {
00211 p -> SetErasedByHandler();
00212 delete p;
00213 }
00214 m_sockets.erase(it);
00215 }
00216 else
00217 {
00218 m_sockets.erase(it);
00219 }
00220 DEB( fprintf(stderr, "next\n");)
00221 }
00222 DEB( fprintf(stderr, "/Emptying sockets list in SocketHandler destructor, %d instances\n", (int)m_sockets.size());)
00223 }
00224 #ifdef ENABLE_RESOLVER
00225 if (m_resolver)
00226 {
00227 delete m_resolver;
00228 }
00229 #endif
00230 if (m_b_use_mutex)
00231 {
00232 m_mutex.Unlock();
00233 }
00234 }
00235
00236
00237 ISocketHandler *SocketHandler::Create(StdLog *log)
00238 {
00239 return new SocketHandler(log);
00240 }
00241
00242
00243 ISocketHandler *SocketHandler::Create(IMutex& mutex, ISocketHandler& parent, StdLog *log)
00244 {
00245 return new SocketHandler(mutex, parent, log);
00246 }
00247
00248
00249 bool SocketHandler::ParentHandlerIsValid()
00250 {
00251 return m_b_parent_is_valid;
00252 }
00253
00254
00255 ISocketHandler& SocketHandler::ParentHandler()
00256 {
00257 if (!m_b_parent_is_valid)
00258 throw Exception("No parent sockethandler available");
00259 return m_parent;
00260 }
00261
00262
00263 ISocketHandler& SocketHandler::GetRandomHandler()
00264 {
00265 if (m_threads.empty())
00266 throw Exception("SocketHandler is not multithreaded");
00267 size_t min_count = 99999;
00268 SocketHandlerThread *match = NULL;
00269 for (std::list<SocketHandlerThread *>::iterator it = m_threads.begin(); it != m_threads.end(); ++it)
00270 {
00271 SocketHandlerThread *thr = *it;
00272 ISocketHandler& h = thr -> Handler();
00273 {
00274 Lock lock(h.GetMutex());
00275 size_t sz = h.GetCount();
00276 if (sz < min_count)
00277 {
00278 min_count = sz;
00279 match = thr;
00280 }
00281 }
00282 }
00283 if (match)
00284 return match -> Handler();
00285 throw Exception("Can't locate free threaded sockethandler");
00286 }
00287
00288
00289 ISocketHandler& SocketHandler::GetEffectiveHandler()
00290 {
00291 return m_b_parent_is_valid ? m_parent : *this;
00292 }
00293
00294
00295 void SocketHandler::SetNumberOfThreads(size_t n)
00296 {
00297 if (!m_threads.empty())
00298 {
00299 return;
00300 }
00301 if (n > 1 && n < 256)
00302 {
00303 for (int i = 1; i <= (int)n; i++)
00304 {
00305 SocketHandlerThread *p = new SocketHandlerThread(*this);
00306 m_threads.push_back(p);
00307 p -> SetDeleteOnExit();
00308 p -> Start();
00309 p -> Wait();
00310 }
00311 }
00312 }
00313
00314
00315 bool SocketHandler::IsThreaded()
00316 {
00317 return !m_threads.empty();
00318 }
00319
00320
00321 void SocketHandler::EnableRelease()
00322 {
00323 if (m_release)
00324 return;
00325 m_release = new UdpSocket(*this);
00326 m_release -> SetDeleteByHandler();
00327 port_t port = 0;
00328 m_release -> Bind("127.0.0.1", port);
00329 Add(m_release);
00330 }
00331
00332
00333 void SocketHandler::Release()
00334 {
00335 if (!m_release)
00336 return;
00337 m_release -> SendTo("127.0.0.1", m_release -> GetPort(), "\n");
00338 }
00339
00340
00341 IMutex& SocketHandler::GetMutex() const
00342 {
00343 return m_mutex;
00344 }
00345
00346
00347 #ifdef ENABLE_DETACH
00348 void SocketHandler::SetSlave(bool x)
00349 {
00350 m_slave = x;
00351 }
00352
00353
00354 bool SocketHandler::IsSlave()
00355 {
00356 return m_slave;
00357 }
00358 #endif
00359
00360
00361 void SocketHandler::RegStdLog(StdLog *log)
00362 {
00363 m_stdlog = log;
00364 }
00365
00366
00367 void SocketHandler::LogError(Socket *p,const std::string& user_text,int err,const std::string& sys_err,loglevel_t t)
00368 {
00369 if (m_stdlog)
00370 {
00371 m_stdlog -> error(this, p, user_text, err, sys_err, t);
00372 }
00373 }
00374
00375
00376 void SocketHandler::Add(Socket *p)
00377 {
00378 m_add.push_back(p);
00379 }
00380
00381
00382 void SocketHandler::ISocketHandler_Add(Socket *p,bool bRead,bool bWrite)
00383 {
00384 Set(p, bRead, bWrite);
00385 }
00386
00387
00388 void SocketHandler::ISocketHandler_Mod(Socket *p,bool bRead,bool bWrite)
00389 {
00390 Set(p, bRead, bWrite);
00391 }
00392
00393
00394 void SocketHandler::ISocketHandler_Del(Socket *p)
00395 {
00396 Set(p, false, false);
00397 }
00398
00399
00400 void SocketHandler::Set(Socket *p,bool bRead,bool bWrite)
00401 {
00402 SOCKET s = p -> GetSocket();
00403 if (s >= 0)
00404 {
00405 bool bException = true;
00406 if (bRead)
00407 {
00408 if (!FD_ISSET(s, &m_rfds))
00409 {
00410 FD_SET(s, &m_rfds);
00411 }
00412 }
00413 else
00414 {
00415 FD_CLR(s, &m_rfds);
00416 }
00417 if (bWrite)
00418 {
00419 if (!FD_ISSET(s, &m_wfds))
00420 {
00421 FD_SET(s, &m_wfds);
00422 }
00423 }
00424 else
00425 {
00426 FD_CLR(s, &m_wfds);
00427 }
00428 if (bException)
00429 {
00430 if (!FD_ISSET(s, &m_efds))
00431 {
00432 FD_SET(s, &m_efds);
00433 }
00434 }
00435 else
00436 {
00437 FD_CLR(s, &m_efds);
00438 }
00439 }
00440 }
00441
00442
00443 #ifdef ENABLE_RESOLVER
00444 bool SocketHandler::Resolving(Socket *p0)
00445 {
00446 std::map<socketuid_t, bool>::iterator it = m_resolve_q.find(p0 -> UniqueIdentifier());
00447 return it != m_resolve_q.end();
00448 }
00449 #endif
00450
00451
00452 bool SocketHandler::Valid(Socket *p0)
00453 {
00454 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
00455 {
00456 Socket *p = it -> second;
00457 if (p0 == p)
00458 return true;
00459 }
00460 return false;
00461 }
00462
00463
00464 bool SocketHandler::Valid(socketuid_t uid)
00465 {
00466 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
00467 {
00468 Socket *p = it -> second;
00469 if (p -> UniqueIdentifier() == uid)
00470 return true;
00471 }
00472 return false;
00473 }
00474
00475
00476 bool SocketHandler::OkToAccept(Socket *)
00477 {
00478 return true;
00479 }
00480
00481
00482 size_t SocketHandler::GetCount()
00483 {
00484 return m_sockets.size() + m_add.size() + m_delete.size();
00485 }
00486
00487
00488 #ifdef ENABLE_SOCKS4
00489 void SocketHandler::SetSocks4Host(ipaddr_t a)
00490 {
00491 m_socks4_host = a;
00492 }
00493
00494
00495 void SocketHandler::SetSocks4Host(const std::string& host)
00496 {
00497 Utility::u2ip(host, m_socks4_host);
00498 }
00499
00500
00501 void SocketHandler::SetSocks4Port(port_t port)
00502 {
00503 m_socks4_port = port;
00504 }
00505
00506
00507 void SocketHandler::SetSocks4Userid(const std::string& id)
00508 {
00509 m_socks4_userid = id;
00510 }
00511 #endif
00512
00513
00514 #ifdef ENABLE_RESOLVER
00515 int SocketHandler::Resolve(Socket *p,const std::string& host,port_t port)
00516 {
00517
00518 ResolvSocket *resolv = new ResolvSocket(*this, p, host, port);
00519 resolv -> SetId(++m_resolv_id);
00520 resolv -> SetDeleteByHandler();
00521 ipaddr_t local;
00522 Utility::u2ip("127.0.0.1", local);
00523 if (!resolv -> Open(local, m_resolver_port))
00524 {
00525 LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
00526 }
00527 Add(resolv);
00528 m_resolve_q[p -> UniqueIdentifier()] = true;
00529 DEB( fprintf(stderr, " *** Resolve '%s:%d' id#%d m_resolve_q size: %d p: %p\n", host.c_str(), port, resolv -> GetId(), m_resolve_q.size(), p);)
00530 return resolv -> GetId();
00531 }
00532
00533
00534 #ifdef ENABLE_IPV6
00535 int SocketHandler::Resolve6(Socket *p,const std::string& host,port_t port)
00536 {
00537
00538 ResolvSocket *resolv = new ResolvSocket(*this, p, host, port, true);
00539 resolv -> SetId(++m_resolv_id);
00540 resolv -> SetDeleteByHandler();
00541 ipaddr_t local;
00542 Utility::u2ip("127.0.0.1", local);
00543 if (!resolv -> Open(local, m_resolver_port))
00544 {
00545 LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
00546 }
00547 Add(resolv);
00548 m_resolve_q[p -> UniqueIdentifier()] = true;
00549 return resolv -> GetId();
00550 }
00551 #endif
00552
00553
00554 int SocketHandler::Resolve(Socket *p,ipaddr_t a)
00555 {
00556
00557 ResolvSocket *resolv = new ResolvSocket(*this, p, a);
00558 resolv -> SetId(++m_resolv_id);
00559 resolv -> SetDeleteByHandler();
00560 ipaddr_t local;
00561 Utility::u2ip("127.0.0.1", local);
00562 if (!resolv -> Open(local, m_resolver_port))
00563 {
00564 LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
00565 }
00566 Add(resolv);
00567 m_resolve_q[p -> UniqueIdentifier()] = true;
00568 return resolv -> GetId();
00569 }
00570
00571
00572 #ifdef ENABLE_IPV6
00573 int SocketHandler::Resolve(Socket *p,in6_addr& a)
00574 {
00575
00576 ResolvSocket *resolv = new ResolvSocket(*this, p, a);
00577 resolv -> SetId(++m_resolv_id);
00578 resolv -> SetDeleteByHandler();
00579 ipaddr_t local;
00580 Utility::u2ip("127.0.0.1", local);
00581 if (!resolv -> Open(local, m_resolver_port))
00582 {
00583 LogError(resolv, "Resolve", -1, "Can't connect to local resolve server", LOG_LEVEL_FATAL);
00584 }
00585 Add(resolv);
00586 m_resolve_q[p -> UniqueIdentifier()] = true;
00587 return resolv -> GetId();
00588 }
00589 #endif
00590
00591
00592 void SocketHandler::EnableResolver(port_t port)
00593 {
00594 if (!m_resolver)
00595 {
00596 m_resolver_port = port;
00597 m_resolver = new ResolvServer(port);
00598 }
00599 }
00600
00601
00602 bool SocketHandler::ResolverReady()
00603 {
00604 return m_resolver ? m_resolver -> Ready() : false;
00605 }
00606 #endif // ENABLE_RESOLVER
00607
00608
00609 #ifdef ENABLE_SOCKS4
00610 void SocketHandler::SetSocks4TryDirect(bool x)
00611 {
00612 m_bTryDirect = x;
00613 }
00614
00615
00616 ipaddr_t SocketHandler::GetSocks4Host()
00617 {
00618 return m_socks4_host;
00619 }
00620
00621
00622 port_t SocketHandler::GetSocks4Port()
00623 {
00624 return m_socks4_port;
00625 }
00626
00627
00628 const std::string& SocketHandler::GetSocks4Userid()
00629 {
00630 return m_socks4_userid;
00631 }
00632
00633
00634 bool SocketHandler::Socks4TryDirect()
00635 {
00636 return m_bTryDirect;
00637 }
00638 #endif
00639
00640
00641 #ifdef ENABLE_RESOLVER
00642 bool SocketHandler::ResolverEnabled()
00643 {
00644 return m_resolver ? true : false;
00645 }
00646
00647
00648 port_t SocketHandler::GetResolverPort()
00649 {
00650 return m_resolver_port;
00651 }
00652 #endif // ENABLE_RESOLVER
00653
00654
00655 #ifdef ENABLE_POOL
00656 ISocketHandler::PoolSocket *SocketHandler::FindConnection(int type,const std::string& protocol,SocketAddress& ad)
00657 {
00658 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end() && m_sockets.size(); ++it)
00659 {
00660 PoolSocket *pools = dynamic_cast<PoolSocket *>(it -> second);
00661 if (pools)
00662 {
00663 if (pools -> GetSocketType() == type &&
00664 pools -> GetSocketProtocol() == protocol &&
00665
00666 *pools -> GetClientRemoteAddress() == ad)
00667 {
00668 m_sockets.erase(it);
00669 pools -> SetRetain();
00670 return pools;
00671 }
00672 }
00673 }
00674 return NULL;
00675 }
00676
00677
00678 void SocketHandler::EnablePool(bool x)
00679 {
00680 m_b_enable_pool = x;
00681 }
00682
00683
00684 bool SocketHandler::PoolEnabled()
00685 {
00686 return m_b_enable_pool;
00687 }
00688 #endif
00689
00690
00691 void SocketHandler::Remove(Socket *p)
00692 {
00693 #ifdef ENABLE_RESOLVER
00694 std::map<socketuid_t, bool>::iterator it4 = m_resolve_q.find(p -> UniqueIdentifier());
00695 if (it4 != m_resolve_q.end())
00696 m_resolve_q.erase(it4);
00697 #endif
00698 if (p -> ErasedByHandler())
00699 {
00700 return;
00701 }
00702 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
00703 {
00704 if (it -> second == p)
00705 {
00706 LogError(p, "Remove", -1, "Socket destructor called while still in use", LOG_LEVEL_WARNING);
00707 m_sockets.erase(it);
00708 return;
00709 }
00710 }
00711 for (std::list<Socket *>::iterator it2 = m_add.begin(); it2 != m_add.end(); ++it2)
00712 {
00713 if (*it2 == p)
00714 {
00715 LogError(p, "Remove", -2, "Socket destructor called while still in use", LOG_LEVEL_WARNING);
00716 m_add.erase(it2);
00717 return;
00718 }
00719 }
00720 for (std::list<Socket *>::iterator it3 = m_delete.begin(); it3 != m_delete.end(); ++it3)
00721 {
00722 if (*it3 == p)
00723 {
00724 LogError(p, "Remove", -3, "Socket destructor called while still in use", LOG_LEVEL_WARNING);
00725 m_delete.erase(it3);
00726 return;
00727 }
00728 }
00729 }
00730
00731
00732 void SocketHandler::SetCallOnConnect(bool x)
00733 {
00734 m_b_check_callonconnect = x;
00735 }
00736
00737
00738 void SocketHandler::SetDetach(bool x)
00739 {
00740 m_b_check_detach = x;
00741 }
00742
00743
00744 void SocketHandler::SetTimeout(bool x)
00745 {
00746 m_b_check_timeout = x;
00747 }
00748
00749
00750 void SocketHandler::SetRetry(bool x)
00751 {
00752 m_b_check_retry = x;
00753 }
00754
00755
00756 void SocketHandler::SetClose(bool x)
00757 {
00758 m_b_check_close = x;
00759 }
00760
00761
00762 void SocketHandler::DeleteSocket(Socket *p)
00763 {
00764 p -> OnDelete();
00765 if (p -> DeleteByHandler() && !p -> ErasedByHandler())
00766 {
00767 p -> SetErasedByHandler();
00768 }
00769 m_fds_erase.push_back(p -> UniqueIdentifier());
00770 }
00771
00772
00773 void SocketHandler::RebuildFdset()
00774 {
00775 fd_set rfds;
00776 fd_set wfds;
00777 fd_set efds;
00778
00779 FD_ZERO(&rfds);
00780 FD_ZERO(&wfds);
00781 FD_ZERO(&efds);
00782 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
00783 {
00784 SOCKET s = it -> first;
00785 Socket *p = it -> second;
00786 if (s == p -> GetSocket() && s >= 0)
00787 {
00788 fd_set fds;
00789 FD_ZERO(&fds);
00790 FD_SET(s, &fds);
00791 struct timeval tv;
00792 tv.tv_sec = 0;
00793 tv.tv_usec = 0;
00794 int n = select((int)s + 1, &fds, NULL, NULL, &tv);
00795 if (n == -1 && Errno == EBADF)
00796 {
00797
00798 LogError(p, "Select", (int)s, "Bad fd in fd_set (2)", LOG_LEVEL_ERROR);
00799 if (Valid(p) && Valid(p -> UniqueIdentifier()))
00800 {
00801 DeleteSocket(p);
00802 }
00803 }
00804 else
00805 {
00806 if (FD_ISSET(s, &m_rfds))
00807 FD_SET(s, &rfds);
00808 if (FD_ISSET(s, &m_wfds))
00809 FD_SET(s, &wfds);
00810 if (FD_ISSET(s, &m_efds))
00811 FD_SET(s, &efds);
00812 }
00813 }
00814 else
00815 {
00816
00817 LogError(p, "Select", (int)s, "Bad fd in fd_set (3)", LOG_LEVEL_ERROR);
00818 DeleteSocket(p);
00819 }
00820 }
00821 m_rfds = rfds;
00822 m_wfds = wfds;
00823 m_efds = efds;
00824 }
00825
00826
00827 void SocketHandler::AddIncoming()
00828 {
00829 while (m_add.size() > 0)
00830 {
00831 if (m_sockets.size() >= MaxCount())
00832 {
00833 LogError(NULL, "Select", (int)m_sockets.size(), "socket limit reached", LOG_LEVEL_WARNING);
00834 break;
00835 }
00836 std::list<Socket *>::iterator it = m_add.begin();
00837 Socket *p = *it;
00838 SOCKET s = p -> GetSocket();
00839 DEB( fprintf(stderr, "Trying to add fd %d, m_add.size() %d\n", (int)s, (int)m_add.size());)
00840
00841 if (s == INVALID_SOCKET)
00842 {
00843 LogError(p, "Add", -1, "Invalid socket", LOG_LEVEL_WARNING);
00844 m_delete.push_back(p);
00845 m_add.erase(it);
00846 continue;
00847 }
00848 socket_m::iterator it2;
00849 if ((it2 = m_sockets.find(s)) != m_sockets.end())
00850 {
00851 Socket *found = it2 -> second;
00852 if (p -> UniqueIdentifier() > found -> UniqueIdentifier())
00853 {
00854 LogError(p, "Add", (int)p -> GetSocket(), "Replacing socket already in controlled queue (newer uid)", LOG_LEVEL_WARNING);
00855
00856 DeleteSocket(found);
00857 }
00858 else
00859 if (p -> UniqueIdentifier() == found -> UniqueIdentifier())
00860 {
00861 LogError(p, "Add", (int)p -> GetSocket(), "Attempt to add socket already in controlled queue (same uid)", LOG_LEVEL_ERROR);
00862
00863 if (p != found)
00864 m_delete.push_back(p);
00865 m_add.erase(it);
00866 continue;
00867 }
00868 else
00869 {
00870 LogError(p, "Add", (int)p -> GetSocket(), "Attempt to add socket already in controlled queue (older uid)", LOG_LEVEL_FATAL);
00871
00872 m_delete.push_back(p);
00873 m_add.erase(it);
00874 continue;
00875 }
00876 }
00877 if (p -> CloseAndDelete())
00878 {
00879 LogError(p, "Add", (int)p -> GetSocket(), "Added socket with SetCloseAndDelete() true", LOG_LEVEL_WARNING);
00880 m_sockets[s] = p;
00881 DeleteSocket(p);
00882 p -> Close();
00883 }
00884 else
00885 {
00886 m_b_check_callonconnect |= p -> CallOnConnect();
00887 m_b_check_detach |= p -> IsDetach();
00888 m_b_check_timeout |= p -> CheckTimeout();
00889 m_b_check_retry |= p -> RetryClientConnect();
00890
00891
00892
00893
00894
00895
00896
00897
00898
00899
00900
00901
00902
00903
00904
00905
00906
00907
00908 StreamSocket *scp = dynamic_cast<StreamSocket *>(p);
00909 if (scp && scp -> Connecting())
00910 {
00911 ISocketHandler_Add(p,false,true);
00912 }
00913 else
00914 {
00915 TcpSocket *tcp = dynamic_cast<TcpSocket *>(p);
00916 bool bWrite = tcp ? tcp -> GetOutputLength() != 0 : false;
00917 if (p -> IsDisableRead())
00918 {
00919 ISocketHandler_Add(p, false, bWrite);
00920 }
00921 else
00922 {
00923 ISocketHandler_Add(p, true, bWrite);
00924 }
00925 }
00926 m_maxsock = (s > m_maxsock) ? s : m_maxsock;
00927 m_sockets[s] = p;
00928 }
00929
00930 m_add.erase(it);
00931 }
00932 }
00933
00934
00935 void SocketHandler::CheckErasedSockets()
00936 {
00937
00938 bool check_max_fd = false;
00939 while (m_fds_erase.size())
00940 {
00941 std::list<socketuid_t>::iterator it = m_fds_erase.begin();
00942 socketuid_t uid = *it;
00943 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
00944 {
00945 Socket *p = it -> second;
00946 if (p -> UniqueIdentifier() == uid)
00947 {
00948
00949
00950
00951
00952
00953
00954
00955 if (p -> ErasedByHandler()
00956 #ifdef ENABLE_DETACH
00957 && !(m_slave ^ p -> IsDetached())
00958 #endif
00959 )
00960 {
00961 delete p;
00962 }
00963 m_sockets.erase(it);
00964 break;
00965 }
00966 }
00967 m_fds_erase.erase(it);
00968 check_max_fd = true;
00969 }
00970
00971 if (check_max_fd)
00972 {
00973 m_maxsock = 0;
00974 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
00975 {
00976 SOCKET s = it -> first;
00977 m_maxsock = s > m_maxsock ? s : m_maxsock;
00978 }
00979 }
00980 }
00981
00982
00983 void SocketHandler::CheckCallOnConnect()
00984 {
00985 m_b_check_callonconnect = false;
00986 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
00987 {
00988 Socket *p = it -> second;
00989 if (Valid(p) && Valid(p -> UniqueIdentifier()) && p -> CallOnConnect())
00990 {
00991 p -> SetConnected();
00992 #ifdef HAVE_OPENSSL
00993 if (p -> IsSSL())
00994 p -> OnSSLConnect();
00995 else
00996 #endif
00997 #ifdef ENABLE_SOCKS4
00998 if (p -> Socks4())
00999 p -> OnSocks4Connect();
01000 else
01001 #endif
01002 {
01003 TcpSocket *tcp = dynamic_cast<TcpSocket *>(p);
01004 if (tcp)
01005 {
01006 if (tcp -> GetOutputLength())
01007 {
01008 p -> OnWrite();
01009 }
01010 }
01011 #ifdef ENABLE_RECONNECT
01012 if (tcp && tcp -> IsReconnect())
01013 p -> OnReconnect();
01014 else
01015 #endif
01016 {
01017 p -> OnConnect();
01018 }
01019 }
01020 p -> SetCallOnConnect( false );
01021 m_b_check_callonconnect = true;
01022 }
01023 }
01024 }
01025
01026
01027 #ifdef ENABLE_DETACH
01028 void SocketHandler::CheckDetach()
01029 {
01030 m_b_check_detach = false;
01031 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
01032 {
01033 Socket *p = it -> second;
01034 if (p -> IsDetach())
01035 {
01036 ISocketHandler_Del(p);
01037 m_sockets.erase(it);
01038
01039
01040 p -> DetachSocket();
01041
01042
01043
01044 m_b_check_detach = true;
01045 break;
01046 }
01047 }
01048 for (std::list<Socket *>::iterator it = m_add.begin(); it != m_add.end() && !m_b_check_detach; ++it)
01049 {
01050 Socket *p = *it;
01051 m_b_check_detach |= p -> IsDetach();
01052 }
01053 }
01054 #endif
01055
01056
01057 void SocketHandler::CheckTimeout(time_t tnow)
01058 {
01059 m_b_check_timeout = false;
01060 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
01061 {
01062 Socket *p = it -> second;
01063 if (Valid(p) && Valid(p -> UniqueIdentifier()) && p -> CheckTimeout())
01064 {
01065 if (p -> Timeout(tnow))
01066 {
01067 StreamSocket *scp = dynamic_cast<StreamSocket *>(p);
01068 p -> SetTimeout(0);
01069 if (scp && scp -> Connecting())
01070 {
01071 p -> OnConnectTimeout();
01072
01073 p -> SetTimeout( scp -> GetConnectTimeout() );
01074 }
01075 else
01076 {
01077 p -> OnTimeout();
01078 }
01079 }
01080 m_b_check_timeout = true;
01081 }
01082 }
01083 }
01084
01085
01086 void SocketHandler::CheckRetry()
01087 {
01088 m_b_check_retry = false;
01089 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
01090 {
01091 Socket *p = it -> second;
01092 if (Valid(p) && Valid(p -> UniqueIdentifier()) && p -> RetryClientConnect())
01093 {
01094 TcpSocket *tcp = dynamic_cast<TcpSocket *>(p);
01095 tcp -> SetRetryClientConnect(false);
01096 DEB( fprintf(stderr, "Close() before retry client connect\n");)
01097 p -> Close();
01098 std::auto_ptr<SocketAddress> ad = p -> GetClientRemoteAddress();
01099 if (ad.get())
01100 {
01101 tcp -> Open(*ad);
01102 }
01103 else
01104 {
01105 LogError(p, "RetryClientConnect", 0, "no address", LOG_LEVEL_ERROR);
01106 }
01107 Add(p);
01108 m_fds_erase.push_back(p -> UniqueIdentifier());
01109 m_b_check_retry = true;
01110 }
01111 }
01112 }
01113
01114
01115 void SocketHandler::CheckClose()
01116 {
01117 m_b_check_close = false;
01118 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
01119 {
01120 Socket *p = it -> second;
01121 if (Valid(p) && Valid(p -> UniqueIdentifier()) && p -> CloseAndDelete() )
01122 {
01123 TcpSocket *tcp = dynamic_cast<TcpSocket *>(p);
01124 #ifdef ENABLE_RECONNECT
01125 if (p -> Lost() && !(tcp && tcp -> Reconnect()))
01126 #else
01127 if (p -> Lost())
01128 #endif
01129 {
01130
01131 DeleteSocket(p);
01132 }
01133 else
01134
01135 if (tcp && p -> IsConnected() && tcp -> GetFlushBeforeClose() &&
01136 #ifdef HAVE_OPENSSL
01137 !tcp -> IsSSL() &&
01138 #endif
01139 p -> TimeSinceClose() < 5)
01140 {
01141 DEB( fprintf(stderr, " close(1)\n");)
01142 if (tcp -> GetOutputLength())
01143 {
01144 LogError(p, "Closing", (int)tcp -> GetOutputLength(), "Sending all data before closing", LOG_LEVEL_INFO);
01145 }
01146 else
01147 if (!(tcp -> GetShutdown() & SHUT_WR))
01148 {
01149 SOCKET nn = it -> first;
01150 if (nn != INVALID_SOCKET && shutdown(nn, SHUT_WR) == -1)
01151 {
01152 LogError(p, "graceful shutdown", Errno, StrError(Errno), LOG_LEVEL_ERROR);
01153 }
01154 tcp -> SetShutdown(SHUT_WR);
01155 }
01156 else
01157 {
01158 ISocketHandler_Del(p);
01159 tcp -> Close();
01160 DeleteSocket(p);
01161 }
01162 }
01163 else
01164 #ifdef ENABLE_RECONNECT
01165 if (tcp && p -> IsConnected() && tcp -> Reconnect())
01166 {
01167 p -> SetCloseAndDelete(false);
01168 tcp -> SetIsReconnect();
01169 p -> SetConnected(false);
01170 DEB( fprintf(stderr, "Close() before reconnect\n");)
01171 p -> Close();
01172 p -> OnDisconnect();
01173 std::auto_ptr<SocketAddress> ad = p -> GetClientRemoteAddress();
01174 if (ad.get())
01175 {
01176 tcp -> Open(*ad);
01177 }
01178 else
01179 {
01180 LogError(p, "Reconnect", 0, "no address", LOG_LEVEL_ERROR);
01181 }
01182 tcp -> ResetConnectionRetries();
01183 Add(p);
01184 m_fds_erase.push_back(p -> UniqueIdentifier());
01185 }
01186 else
01187 #endif
01188 {
01189 if (tcp && p -> IsConnected() && tcp -> GetOutputLength())
01190 {
01191 LogError(p, "Closing", (int)tcp -> GetOutputLength(), "Closing socket while data still left to send", LOG_LEVEL_WARNING);
01192 }
01193 #ifdef ENABLE_POOL
01194 if (p -> Retain() && !p -> Lost())
01195 {
01196 PoolSocket *p2 = new PoolSocket(*this, p);
01197 p2 -> SetDeleteByHandler();
01198 Add(p2);
01199
01200 p -> SetCloseAndDelete(false);
01201 }
01202 else
01203 #endif // ENABLE_POOL
01204 {
01205 ISocketHandler_Del(p);
01206 DEB( fprintf(stderr, "Close() before OnDelete\n");)
01207 p -> Close();
01208 }
01209 DeleteSocket(p);
01210 }
01211 m_b_check_close = true;
01212 }
01213 }
01214 }
01215
01216
01217 int SocketHandler::ISocketHandler_Select(struct timeval *tsel)
01218 {
01219 #ifdef MACOSX
01220 fd_set rfds;
01221 fd_set wfds;
01222 fd_set efds;
01223 FD_COPY(&m_rfds, &rfds);
01224 FD_COPY(&m_wfds, &wfds);
01225 FD_COPY(&m_efds, &efds);
01226 #else
01227 fd_set rfds = m_rfds;
01228 fd_set wfds = m_wfds;
01229 fd_set efds = m_efds;
01230 #endif
01231 int n;
01232 DEB(
01233 printf("select( %d, [", m_maxsock + 1);
01234 for (size_t i = 0; i <= m_maxsock; i++)
01235 if (FD_ISSET(i, &rfds))
01236 printf(" %d", i);
01237 printf("], [");
01238 for (size_t i = 0; i <= m_maxsock; i++)
01239 if (FD_ISSET(i, &wfds))
01240 printf(" %d", i);
01241 printf("], [");
01242 for (size_t i = 0; i <= m_maxsock; i++)
01243 if (FD_ISSET(i, &efds))
01244 printf(" %d", i);
01245 printf("]\n");
01246 )
01247 if (m_b_use_mutex)
01248 {
01249 m_mutex.Unlock();
01250 n = select( (int)(m_maxsock + 1),&rfds,&wfds,&efds,tsel);
01251 m_mutex.Lock();
01252 }
01253 else
01254 {
01255 n = select( (int)(m_maxsock + 1),&rfds,&wfds,&efds,tsel);
01256 }
01257 if (n == -1)
01258 {
01259 int err = Errno;
01260
01261
01262
01263
01264
01265
01266 #ifdef _WIN32
01267 switch (err)
01268 {
01269 case WSAENOTSOCK:
01270 RebuildFdset();
01271 break;
01272 case WSAEINTR:
01273 case WSAEINPROGRESS:
01274 break;
01275 case WSAEINVAL:
01276 LogError(NULL, "SocketHandler::Select", err, StrError(err), LOG_LEVEL_FATAL);
01277 throw Exception("select(n): n is negative. Or struct timeval contains bad time values (<0).");
01278 case WSAEFAULT:
01279 LogError(NULL, "SocketHandler::Select", err, StrError(err), LOG_LEVEL_ERROR);
01280 break;
01281 case WSANOTINITIALISED:
01282 throw Exception("WSAStartup not successfully called");
01283 case WSAENETDOWN:
01284 throw Exception("Network subsystem failure");
01285 }
01286 #else
01287 switch (err)
01288 {
01289 case EBADF:
01290 RebuildFdset();
01291 break;
01292 case EINTR:
01293 break;
01294 case EINVAL:
01295 LogError(NULL, "SocketHandler::Select", err, StrError(err), LOG_LEVEL_FATAL);
01296 throw Exception("select(n): n is negative. Or struct timeval contains bad time values (<0).");
01297 case ENOMEM:
01298 LogError(NULL, "SocketHandler::Select", err, StrError(err), LOG_LEVEL_ERROR);
01299 break;
01300 }
01301 #endif
01302 printf("error on select(): %d %s\n", Errno, StrError(err));
01303 }
01304 else
01305 if (!n)
01306 {
01307 }
01308 else
01309 if (n > 0)
01310 {
01311 for (socket_m::iterator it = m_sockets.begin(); it != m_sockets.end(); ++it)
01312 {
01313 SOCKET i = it -> first;
01314 Socket *p = it -> second;
01315
01316 if (FD_ISSET(i, &rfds))
01317 {
01318 #ifdef HAVE_OPENSSL
01319 if (p -> IsSSLNegotiate())
01320 {
01321 p -> SSLNegotiate();
01322 }
01323 else
01324 #endif
01325 {
01326 p -> OnRead();
01327 }
01328 }
01329
01330 if (FD_ISSET(i, &wfds))
01331 {
01332 #ifdef HAVE_OPENSSL
01333 if (p -> IsSSLNegotiate())
01334 {
01335 p -> SSLNegotiate();
01336 }
01337 else
01338 #endif
01339 {
01340 p -> OnWrite();
01341 }
01342 }
01343
01344 if (FD_ISSET(i, &efds))
01345 {
01346 p -> OnException();
01347 }
01348 }
01349 }
01350 return n;
01351 }
01352
01353
01354 int SocketHandler::Select(long sec,long usec)
01355 {
01356 struct timeval tv;
01357 tv.tv_sec = sec;
01358 tv.tv_usec = usec;
01359 return Select(&tv);
01360 }
01361
01362
01363 int SocketHandler::Select()
01364 {
01365 if (m_b_check_callonconnect ||
01366 m_b_check_detach ||
01367 m_b_check_timeout ||
01368 m_b_check_retry ||
01369 m_b_check_close)
01370 {
01371 return Select(0, m_b_check_detach ? 10000 : 200000);
01372 }
01373 return Select(NULL);
01374 }
01375
01376
01377 int SocketHandler::Select(struct timeval *tsel)
01378 {
01379 if (!m_add.empty())
01380 {
01381 AddIncoming();
01382 }
01383 int n = ISocketHandler_Select(tsel);
01384
01385 if (m_b_check_callonconnect)
01386 {
01387 CheckCallOnConnect();
01388 }
01389
01390 #ifdef ENABLE_DETACH
01391
01392 if (!m_slave && m_b_check_detach)
01393 {
01394 CheckDetach();
01395 }
01396 #endif
01397
01398
01399 if (m_b_check_timeout)
01400 {
01401 time_t tnow = time(NULL);
01402 if (tnow != m_tlast)
01403 {
01404 CheckTimeout(tnow);
01405 m_tlast = tnow;
01406 }
01407 }
01408
01409
01410 if (m_b_check_retry)
01411 {
01412 CheckRetry();
01413 }
01414
01415
01416 if (m_b_check_close)
01417 {
01418 CheckClose();
01419 }
01420
01421 if (!m_fds_erase.empty())
01422 {
01423 CheckErasedSockets();
01424 }
01425
01426
01427 while (m_delete.size())
01428 {
01429 std::list<Socket *>::iterator it = m_delete.begin();
01430 Socket *p = *it;
01431 p -> OnDelete();
01432 m_delete.erase(it);
01433 if (p -> DeleteByHandler()
01434 #ifdef ENABLE_DETACH
01435 && !(m_slave ^ p -> IsDetached())
01436 #endif
01437 )
01438 {
01439 p -> SetErasedByHandler();
01440 delete p;
01441 }
01442 }
01443
01444 return n;
01445 }
01446
01447
01448 #ifdef SOCKETS_NAMESPACE
01449 }
01450 #endif
01451