Logo
~Apps~
~Projects~
~Contact~


Session.cpp

Go to the documentation of this file.
00001 
00006 /*
00007 Copyright (C) 2005  Anders Hedstrom
00008 
00009 This program is free software; you can redistribute it and/or
00010 modify it under the terms of the GNU General Public License
00011 as published by the Free Software Foundation; either version 2
00012 of the License, or (at your option) any later version.
00013 
00014 This program is distributed in the hope that it will be useful,
00015 but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00017 GNU General Public License for more details.
00018 
00019 You should have received a copy of the GNU General Public License
00020 along with this program; if not, write to the Free Software
00021 Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA  02111-1307, USA.
00022 */
00023 #ifdef _WIN32
00024 #pragma warning(disable:4786)
00025 #define random rand
00026 typedef unsigned __int32 uint32_t;
00027 #endif
00028 #include <sys/stat.h>
00029 #include <sys/types.h>
00030 #include <vector>
00031 #include <Uid.h>
00032 #include <Utility.h>
00033 
00034 #include "PeerHandler.h"
00035 #include "Peer.h"
00036 #include "bitmap_t.h"
00037 #include "Piece.h"
00038 #include "pSocket.h"
00039 #include "FileManager.h"
00040 #include "Clock.h"
00041 #include "Session.h"
00042 
00043 #ifdef _DEBUG
00044 #define DEB(x) x
00045 #else
00046 #define DEB(x)
00047 #endif
00048 
00049 
00050 
00051 Session::Session(ISocketHandler& h,const std::string& hash)
00052 :m_handler(h)
00053 ,m_info_hash(hash)
00054 ,m_piece_length(0)
00055 ,m_length(0)
00056 ,m_number_of_pieces(0)
00057 ,m_last_length(0)
00058 ,m_interval(60)
00059 ,m_t_tracker(0)
00060 //,m_bitmap(NULL)
00061 ,m_prev_offset(0)
00062 ,m_prev_length(0)
00063 ,m_length_one(0)
00064 ,m_filemanager(NULL)
00065 ,m_b_check_complete(false)
00066 ,m_demon(false)
00067 {
00068 DEB(    printf("%s\n", hash.c_str());)
00069         Uid uid;
00070         memcpy(m_peer_id, "-++0001-", 8);
00071         memcpy(m_peer_id + 8, uid.GetBuf(), 12);
00072 }
00073 
00074 
00075 Session::~Session()
00076 {
00077         Save();
00078         while (m_files.size())
00079         {
00080                 file_v::iterator it = m_files.begin();
00081                 file_t *p = *it;
00082                 delete p;
00083                 m_files.erase(it);
00084         }
00085         while (m_peers.size())
00086         {
00087                 peer_v::iterator it = m_peers.begin();
00088                 Peer *p = *it;
00089                 delete p;
00090                 m_peers.erase(it);
00091         }
00092         while (m_complete.size())
00093         {
00094                 piece_v::iterator it = m_complete.begin();
00095                 Piece *p = *it;
00096                 delete p;
00097                 m_complete.erase(it);
00098         }
00099         while (m_incomplete.size())
00100         {
00101                 piece_v::iterator it = m_incomplete.begin();
00102                 Piece *p = *it;
00103                 delete p;
00104                 m_incomplete.erase(it);
00105         }
00106         if (m_filemanager)
00107                 delete m_filemanager;
00108 }
00109 
00110 
00111 void Session::AddFile(int64_t length)
00112 {
00113         AddFile(m_name, length);
00114         m_length_one = length;
00115 }
00116 
00117 
00118 void Session::AddFile(const std::string& path,int64_t length)
00119 {
00120 DEB(printf("    %s (%lld @ %lld)\n", path.c_str(), length, m_prev_offset + m_prev_length);)
00121         file_t *p = new file_t(path, length);
00122         m_length += length;
00123         p -> offset = m_prev_offset + m_prev_length;
00124         m_files.push_back(p);
00125         m_prev_offset = p -> offset;
00126         m_prev_length = p -> length;
00127 }
00128 
00129 
00130 // load peers, complete, incomplete
00131 void Session::Load()
00132 {
00133         std::string filename = GetBitmapFilename();
00134         FILE *fil = fopen(filename.c_str(), "rb");
00135         if (fil)
00136         {
00137                 size_t q;
00138                 fread(&q, sizeof(size_t), 1, fil); // number of peers
00139                 while (q--)
00140                 {
00141                         char c;
00142                         char ip[100];
00143                         fread(&c, 1, 1, fil); // length of ip
00144                         fread(ip, 1, c, fil);
00145                         ip[ (int)c] = 0;
00146                         char id[20];
00147                         fread(id, 1, 20, fil);
00148                         port_t port;
00149                         fread(&port, sizeof(port_t), 1, fil);
00150                         if (!GetPeer(ip) && port)
00151                         {
00152                                 Peer *p = new Peer(Handler(),m_info_hash,ip,static_cast<std::string>(id).substr(0,20),port);
00153                                 AddPeer(p);
00154                         }
00155                 }
00156                 load_piece_v(fil, m_complete);
00157                 load_piece_v(fil, m_incomplete);
00158                 fclose(fil);
00159                 // ...
00160                 for (piece_v::iterator it = m_complete.begin(); it != m_complete.end(); it++)
00161                 {
00162                         Piece *p = *it;
00163                         p -> ClearRequested();
00164                 }
00165                 {
00166                         for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00167                         {
00168                                 Piece *p = *it;
00169                                 p -> ClearRequested();
00170                         }
00171                 }
00172                 // verify loaded data
00173 /*
00174                 while (m_peers.size())
00175                 {
00176                         peer_v::iterator it = m_peers.begin();
00177                         Peer *peer = *it;
00178                         delete peer;
00179                         m_peers.erase(it);
00180                 }
00181 */
00182                 return;
00183         }
00184         // no file to load
00185         for (size_t i = 0; i < m_number_of_pieces; i++)
00186         {
00187                 Piece *p;
00188                 if (i < m_number_of_pieces - 1)
00189                 {
00190                         p = new Piece(i, m_piece_length);
00191                 }
00192                 else
00193                 {
00194                         p = new Piece(i, m_last_length);
00195                 }
00196                 m_incomplete.push_back(p);
00197         }
00198         Verify();
00199 }
00200 
00201 
00202 // save peers, complete, incomplete
00203 void Session::Save()
00204 {
00205         std::string filename = GetBitmapFilename();
00206         static_cast<PeerHandler&>(Handler()).mkpath( filename );
00207         FILE *fil = fopen(filename.c_str(), "wb");
00208         if (fil)
00209         {
00210                 size_t q = m_peers.size();
00211                 fwrite(&q, sizeof(size_t), 1, fil);
00212                 for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00213                 {
00214                         Peer *peer = *it;
00215                         char c = peer -> GetIP().size();
00216                         fwrite(&c, 1, 1, fil);
00217                         fwrite(peer -> GetIP().c_str(), 1, c, fil);
00218                         fwrite(peer -> GetID().c_str(), 1, 20, fil);
00219                         port_t p = peer -> GetPort();
00220                         fwrite(&p, sizeof(port_t), 1, fil);
00221                 }
00222                 save_piece_v(fil, m_complete);
00223                 save_piece_v(fil, m_incomplete);
00224                 fclose(fil);
00225         }
00226 DEB(    printf("Save: %s\n", m_info_hash.c_str());)
00227 }
00228 
00229 
00230 std::string Session::GetBitmapFilename()
00231 {
00232         std::string torrentdir = static_cast<PeerHandler&>(Handler()).GetTorrentDirectory();
00233         return torrentdir + "/" + m_info_hash + "/." + m_info_hash;
00234 }
00235 
00236 
00237 Peer *Session::GetPeer(const std::string& ip)
00238 {
00239         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00240         {
00241                 Peer *p = *it;
00242                 if (p -> GetIP() == ip)
00243                         return p;
00244         }
00245         return NULL;
00246 }
00247 
00248 
00249 std::string Session::GetAnnounceUrl(const std::string& event)
00250 {
00251         std::string url;
00252         char slask[200];
00253 
00254         url = GetAnnounce();
00255         url += "?info_hash=";
00256         for (size_t i = 0; i < 40; i += 2)
00257         {
00258                 url += '%';
00259                 url += m_info_hash.substr(i,2);
00260         }
00261 
00262         strcpy(slask,"&peer_id=");
00263         {
00264                 for (size_t i = 0; i < 20; i++)
00265                 {
00266                         unsigned short x = m_peer_id[i];
00267                         sprintf(slask + strlen(slask),"%%%02x",x);
00268                 }
00269         }
00270         url += slask; // peer_id
00271         url += "&ip=" + static_cast<PeerHandler&>(Handler()).GetExternIP();
00272         sprintf(slask,"&port=%d", static_cast<PeerHandler&>(Handler()).GetListenPort() );
00273         url += slask; // port
00274         url += "&uploaded=0";
00275         url += "&downloaded=0";
00276         url += "&left=1";
00277         if (event.size())
00278                 url += "&event=" + event;
00279         url += "&numwant=" + Utility::l2string(static_cast<PeerHandler&>(Handler()).GetMinPeers() * 2);
00280 
00281         return url;
00282 }
00283 
00284 
00285 void Session::SetPieceLength(int64_t x)
00286 {
00287         m_piece_length = x;
00288         if (m_piece_length % SLICE)
00289         {
00290                 printf("+---------------------------------\n");
00291                 printf(" Odd piece length: %lld\n", x);
00292                 printf("+---------------------------------\n");
00293         }
00294 }
00295 
00296 
00297 void Session::SetPieces(const std::string& x)
00298 {
00299         m_pieces = x;
00300         m_number_of_pieces = x.size() / 20;
00301         m_last_length = m_length % m_piece_length;
00302         if (!m_last_length)
00303                 m_last_length = m_piece_length;
00304 }
00305 
00306 
00307 void Session::AddConnect()
00308 {
00309         PeerHandler& ref = static_cast<PeerHandler&>(Handler());
00310         std::vector<Peer *> available;
00311         time_t now = time(NULL);
00312         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00313         {
00314                 Peer *peer = *it;
00315                 if (peer -> Failed())
00316                 {
00317                         m_peers.erase(it);
00318                         break;
00319                 }
00320         }
00321         {
00322                 for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00323                 {
00324                         Peer *peer = *it;
00325                         if (!peer -> Connected() && 
00326                                 now > peer -> GetChokeTime() + ref.GetChokeTimer() && 
00327                                 !peer -> Tried() &&
00328                                 !peer -> Failed())
00329                         {
00330                                 available.push_back(peer);
00331                         }
00332                 }
00333         }
00334         size_t q;
00335         if ((q = available.size()) > 0)
00336         {
00337                 Peer *peer = available[random() % q];
00338                 pSocket *p = new pSocket(Handler(), m_info_hash, (unsigned char *)peer -> GetID().c_str());
00339                 p -> Open(peer -> GetIP(), peer -> GetPort());
00340                 p -> SetDeleteByHandler();
00341                 Handler().Add(p);
00342                 peer -> SetTried();
00343         }
00344         else
00345         {
00346                 for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00347                 {
00348                         Peer *peer = *it;
00349                         peer -> SetTried(false);
00350                 }
00351         }
00352 }
00353 
00354 
00355 bool Session::GetRandomNotRequested(Peer *peer,size_t& piece,size_t& offset,size_t& length)
00356 {
00357         std::map<size_t,std::vector<Piece *> > mmap;
00358         size_t q[1000];
00359         size_t max = 0;
00360         memset(q, 0, sizeof(size_t[1000]));
00361 //DEB(printf("Find most complete piece(s)\n");)
00362         for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00363         {
00364                 Piece *p = *it;
00365                 if (peer -> IsSet(p -> GetNumber()) && !p -> AllRequested())
00366                 {
00367                         size_t n = p -> NumberComplete();
00368                         n = p -> NumberRequested();
00369                         if (n < 1000)
00370                         {
00371                                 q[n]++;
00372                                 mmap[n].push_back(p);
00373                                 max = MAX(max,n);
00374                         }
00375                 }
00376         }
00377 /*
00378 DEB(    if (max > 0)
00379         {
00380                 printf("Top pieces(%d/%d):", max, m_piece_length / SLICE);
00381                 for (std::vector<Piece *>::iterator it = mmap[max].begin(); it != mmap[max].end(); it++)
00382                 {
00383                         Piece *p = *it;
00384                         printf(" %d", p -> GetNumber());
00385                 }
00386                 printf("\n");
00387         })
00388 */
00389 //DEB(printf("Select most complete piece\n");)
00390         while (max > 0)
00391         {
00392                 if (q[max] && (!max || (random() % 13 > 0)) )
00393                 {
00394                         std::vector<Piece *>& ref = mmap[max];
00395                         Piece *p = ref[random() % ref.size()];
00396                         piece = p -> GetNumber();
00397                         return p -> GetRandomNotRequested(offset, length);
00398                 }
00399                 max--;
00400         }
00401         //
00402         {
00403                 if (q[max])
00404                 {
00405                         std::vector<Piece *>& ref = mmap[max];
00406                         Piece *p = ref[random() % ref.size()];
00407                         piece = p -> GetNumber();
00408                         return p -> GetRandomNotRequested(offset, length);
00409                 }
00410         }
00411         // find rarest piece to begin with
00412 /*
00413         if (q[max])
00414         {
00415 DEB(printf("Find rarest piece\n");)
00416                 std::vector<Piece *>& ref = mmap[max];
00417                 std::vector<Piece *> available;
00418                 size_t qinst = 9999999;
00419                 for (std::vector<Piece *>::iterator it = ref.begin(); it != ref.end(); it++)
00420                 {
00421                         Piece *p = *it;
00422                         size_t n = GetInstances(p -> GetNumber());
00423                         if (n && n < qinst)
00424                         {
00425                                 while (available.size())
00426                                         available.erase(available.begin());
00427                                 available.push_back(p);
00428                                 qinst = n;
00429                         }
00430                         else
00431                         if (n == qinst)
00432                         {
00433                                 available.push_back(p);
00434                         }
00435                 }
00436                 size_t q;
00437                 if ((q = available.size()) > 0)
00438                 {
00439                         Piece *p = available[random() % q];
00440                         piece = p -> GetNumber();
00441 DEB(printf("Random Rare: %d (out of %d available)\n", piece, q);)
00442                         return p -> GetRandomNotRequested(offset, length);
00443                 }
00444         }
00445 */
00446         return false;
00447 }
00448 
00449 
00450 Request *Session::AddRequest(Peer *peer,size_t piece,size_t offset,size_t length)
00451 {
00452         pSocket *sock = peer -> PeerSocket();
00453         for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00454         {
00455                 Piece *p = *it;
00456                 if (p -> GetNumber() == piece)
00457                 {
00458                         // TODO: check status
00459                         p -> SetRequested(offset, length);
00460                         if (sock)
00461                         {
00462                                 sock -> SendRequest(piece, offset, length);
00463                         }
00464                         return new Request(piece, offset, length);
00465                 }
00466         }
00467         return NULL;
00468 }
00469 
00470 
00471 void Session::RemoveRequest(Peer *peer,size_t piece,size_t offset,size_t length)
00472 {
00473         pSocket *sock = peer -> PeerSocket();
00474         for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00475         {
00476                 Piece *p = *it;
00477                 if (p -> GetNumber() == piece)
00478                 {
00479                         // TODO: check status
00480                         p -> SetRequested(offset, 0);
00481                         if (sock)
00482                         {
00483                                 sock -> SendCancel(piece, offset, length);
00484                         }
00485                         return;
00486                 }
00487         }
00488 }
00489 
00490 
00491 void Session::SaveSlice(size_t piece,size_t offset,size_t length,unsigned char *buf)
00492 {
00493         for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00494         {
00495                 Piece *p = *it;
00496                 if (p -> GetNumber() == piece)
00497                 {
00498                         p -> SetComplete(offset, length);
00499                 }
00500         }
00501         // FileManager
00502         // void Write(size_t index,unsigned char *buf,size_t length,size_t begin);
00503         if (m_filemanager)
00504         {
00505                 m_filemanager -> Write(piece, buf, length, offset);
00506         }
00507 }
00508 
00509 
00510 void Session::CreateFileManager()
00511 {
00512         m_filemanager = new FileManager(static_cast<PeerHandler&>(Handler()), m_info_hash);
00513 }
00514 
00515 
00516 Piece *Session::GetIncomplete(size_t piece)
00517 {
00518         for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00519         {
00520                 Piece *p = *it;
00521                 if (p -> GetNumber() == piece)
00522                         return p;
00523         }
00524         return NULL;
00525 }
00526 
00527 
00528 Piece *Session::GetComplete(size_t piece)
00529 {
00530         for (piece_v::iterator it = m_complete.begin(); it != m_complete.end(); it++)
00531         {
00532                 Piece *p = *it;
00533                 if (p -> GetNumber() == piece)
00534                         return p;
00535         }
00536         return NULL;
00537 }
00538 
00539 
00540 void Session::Verify()
00541 {
00542         bool repeat;
00543         for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00544         {
00545                 Piece *p = *it;
00546                 if (m_filemanager -> Verify(p -> GetNumber(),p -> PieceLength()) )
00547                 {
00548                         for (size_t i = 0; i < p -> PieceLength(); i += p -> SliceSize())
00549                         {
00550                                 p -> SetComplete(i, MIN(p -> SliceSize(),p -> PieceLength() - i) );
00551                         }
00552                 }
00553         }
00554         do {
00555                 repeat = false;
00556                 for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00557                 {
00558                         Piece *p = *it;
00559                         if (p -> Complete())
00560                         {
00561                                 m_complete.push_back(p);
00562                                 p -> ClearComplete();
00563                                 p -> ClearRequested();
00564                                 m_incomplete.erase(it);
00565                                 repeat = true;
00566                                 break;
00567                         }
00568                 }
00569         } while (repeat);
00570 }
00571 
00572 
00573 void Session::SendSlice(pSocket *sock,size_t piece,size_t offset,size_t length)
00574 {
00575         Piece *p = GetComplete(piece);
00576         if (p)
00577         {
00578                 unsigned char *buf = new unsigned char[m_piece_length];
00579                 m_filemanager -> ReadPiece(piece, buf, m_piece_length );
00580                 uint32_t l = htonl(length + 9);
00581                 sock -> SendBuf( (char *)&l, 4);
00582                 sock -> Send("\07");
00583                 l = htonl(piece);
00584                 sock -> SendBuf( (char *)&l, 4);
00585                 l = htonl(offset);
00586                 sock -> SendBuf( (char *)&l, 4);
00587                 sock -> SendBuf( (char *)&buf[offset], length);
00588                 delete buf;
00589                 p -> SetRequested(offset, length);
00590         }
00591 }
00592 
00593 
00594 bool Session::SliceSent(size_t piece,size_t offset)
00595 {
00596         Piece *p = GetComplete(piece);
00597         if (p)
00598         {
00599                 if (p -> Requested(offset))
00600                         return true;
00601         }
00602         return false;
00603 }
00604 
00605 
00606 void Session::save_piece_v(FILE *fil,piece_v& ref)
00607 {
00608         size_t q = ref.size();
00609         fwrite(&q, sizeof(size_t), 1, fil);
00610         for (piece_v::iterator it = ref.begin(); it != ref.end(); it++)
00611         {
00612                 Piece *p = *it;
00613                 size_t nr = p -> GetNumber();
00614                 fwrite(&nr, sizeof(size_t), 1, fil);
00615                 nr = p -> PieceLength();
00616                 fwrite(&nr, sizeof(size_t), 1, fil);
00617                 p -> save_slice_m(fil, p -> MapComplete());
00618                 p -> save_slice_m(fil, p -> MapRequested());
00619         }
00620 }
00621 
00622 
00623 void Session::load_piece_v(FILE *fil,piece_v& ref)
00624 {
00625         size_t q;
00626         fread(&q, sizeof(size_t), 1, fil);
00627         while (q--)
00628         {
00629                 size_t nr, piece_length;
00630                 fread(&nr, sizeof(size_t), 1, fil);
00631                 fread(&piece_length, sizeof(size_t), 1, fil);
00632                 Piece *p = new Piece(nr, piece_length);
00633                 p -> load_slice_m(fil, p -> MapComplete());
00634                 p -> load_slice_m(fil, p -> MapRequested());
00635                 ref.push_back(p);
00636         }
00637 }
00638 
00639 
00640 bool Session::GenerateRequest(Peer *peer)
00641 {
00642         request_v& reqs = peer -> Requests();
00643         size_t piece;
00644         size_t offset;
00645         size_t length;
00646         if (!GetRandomNotRequested(peer, piece, offset, length))
00647         {
00648 //printf(" *** GetRandomNotRequested failed\n");
00649                 return false;
00650 //              break; // no more unrequested available slices this peer can supply
00651         }
00652         Request *r = AddRequest(peer, piece, offset, length);
00653         if (r)
00654         {
00655                 reqs.push_back(r);
00656         }
00657         return true;
00658 }
00659 
00660 
00661 bool Session::PieceUnique(size_t piece)
00662 {
00663         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00664         {
00665                 Peer *peer = *it;
00666                 if (peer -> IsSet(piece))
00667                 {
00668                         return false;
00669                 }
00670         }
00671         return true;
00672 }
00673 
00674 
00675 void Session::PeerStatus() 
00676 {
00677         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00678         {
00679                 Peer *peer = *it;
00680                 pSocket *sock = peer -> PeerSocket();
00681                 if (sock)
00682                 {
00683                         sock -> ShowStatus(m_number_of_pieces);
00684 //                      printf("%20s %5d / %5d\n", peer -> GetIP().c_str(), peer -> GetSet(), m_number_of_pieces);
00685                 }
00686         }
00687 }
00688 
00689 
00690 void Session::Update()
00691 {
00692         PeerHandler& ref = static_cast<PeerHandler&>(Handler());
00693         bool debug_time = (ref.GetDebug() & 512) ? true : false;
00694 DEB(    Clock ck;)
00695         // add connections
00696         size_t q = ref.PeerCount(m_info_hash);
00697         if (q < ref.GetMinPeers())
00698         {
00699                 AddConnect();
00700         }
00701 DEB(    if (debug_time) ck.PrintDiff("1");)
00702         // update interested
00703         if (m_b_update_interested)
00704         {
00705                 for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00706                 {
00707                         Peer *peer = *it;
00708                         pSocket *sock = peer -> PeerSocket();
00709                         if (sock && sock -> CTS())
00710                         {
00711 //printf("Check interest: %s\n", peer -> GetIP().c_str());
00712                                 bool interested = false;
00713                                 for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00714                                 {
00715                                         Piece *p = *it;
00716                                         if (peer -> IsSet(p -> GetNumber()))
00717                                         {
00718                                                 interested = true;
00719                                                 break;
00720                                         }
00721                                 }
00722                                 if (interested != sock -> Interested())
00723                                 {
00724                                         sock -> SendInterest(interested);
00725                                 }
00726                         } // Connected
00727                 }
00728                 m_b_update_interested = false;
00729         }
00730 DEB(    if (debug_time) ck.PrintDiff("2");)
00731         // make sure all unchoked pSocket's has at least 1 piece OR 256 kB requested data
00732         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00733         {
00734                 Peer *peer = *it;
00735                 request_v& reqs = peer -> Requests();
00736                 // TODO: ignore choke
00737                 pSocket *sock = peer -> PeerSocket();
00738                 // peer -> IsChoked - Choke received from remote end
00739                 // sock -> Choked - we sent Choke to remote end
00740                 if (sock && sock -> CTS() && !peer -> IsChoked() )
00741                 {
00742                         size_t ant = GetPieceLength() / SLICE;
00743 //                      size_t n = (ant * 3) / 2; //20; // min number of requests to queue
00744                         if (reqs.size() < ant / 2)
00745                         {
00746                                 RequestAvailable(peer);
00747 /*
00748                                 if (!GenerateRequest(peer))
00749                                 {
00750 //                                      break;
00751                                 }
00752 */
00753                         }
00754                 }
00755                 else
00756                 {
00757                         if (reqs.size()) // unconnected/choked peer has booked requests - remove
00758                         {
00759                                 while (reqs.size())
00760                                 {
00761                                         request_v::iterator it = reqs.begin();
00762                                         Request *r = *it;
00763                                         RemoveRequest(peer, r -> GetPiece(), r -> GetOffset(), r -> GetLength());
00764                                         delete r;
00765                                         reqs.erase(it);
00766                                 }
00767                                 if (sock && sock -> CTS() && !ref.IgnoreChoke() )
00768                                 {
00769                                         sock -> SetCloseAndDelete();
00770                                 }
00771                         }
00772                 }
00773         }
00774 DEB(    if (debug_time) ck.PrintDiff("3");)
00775         // check for completed pieces
00776         if (m_b_check_complete)
00777         {
00778                 for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
00779                 {
00780                         Piece *p = *it;
00781                         if (p -> Complete())
00782                         {
00783 //DEB(printf("Found complete piece\n");)
00784                                 if (m_filemanager -> Verify(p -> GetNumber(),p -> PieceLength()) )
00785                                 {
00786                                         m_complete.push_back(p);
00787                                         ref.SendHave(m_info_hash, p -> GetNumber());
00788                                         m_incomplete.erase(it);
00789                                         p -> ClearComplete();
00790                                         p -> ClearRequested();
00791                                         Save();
00792                                         break;
00793                                 }
00794                                 else
00795                                 {
00796 DEB(printf("\n\n\n *** Verify failed\n\n\n\n");)
00797                                         p -> ClearComplete();
00798                                         p -> ClearRequested();
00799                                 }
00800                         }
00801                 }
00802                 m_b_check_complete = false;
00803         }
00804 DEB(    if (debug_time) ck.PrintDiff("4");)
00805         // unchoke --- replaced by another choke method further down
00806 /*
00807         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00808         {
00809                 Peer *peer = *it;
00810                 pSocket *sock = peer -> PeerSocket();
00811                 if (sock && sock -> CTS())
00812                 {
00813                         if (sock -> Choked() && sock -> ChokeTime() > ref.LocalChokeTime() )
00814                         {
00815                                 sock -> SendChoke(false);
00816                         }
00817                 }
00818         }
00819 DEB(    if (debug_time) ck.PrintDiff("5");)
00820 */
00821         // reset m_complete::requested when all are set
00822 /*
00823         bool reset = true;
00824         for (piece_v::iterator it = m_complete.begin(); it != m_complete.end(); it++)
00825         {
00826                 Piece *p = *it;
00827                 if (PieceUnique(p -> GetNumber()))
00828                 {
00829                         reset = false;
00830                         break;
00831                 }
00832         }
00833         if (reset)
00834         {
00835                 for (piece_v::iterator it = m_complete.begin(); it != m_complete.end(); it++)
00836                 {
00837                         Piece *p = *it;
00838                         p -> ClearRequested();
00839                 }
00840         }
00841 DEB(    if (debug_time) ck.PrintDiff("6");)
00842 */
00843         // check age of requests
00844         if (time(NULL) % 5 == 0)
00845         {
00846                 CheckRequests();
00847         }
00848 /*
00849         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00850         {
00851                 Peer *peer = *it;
00852                 peer -> CheckRequests();
00853         }
00854 DEB(    if (debug_time) ck.PrintDiff("7");)
00855 */
00856         // kick peers with no score
00857         if (time(NULL) % 30 == 0)
00858         {
00859 /*
00860                 bitmap_t bitmap(m_number_of_pieces);
00861                 for (piece_v::iterator it = m_complete.begin(); it != m_complete.end(); it++)
00862                 {
00863                         Piece *p = *it;
00864                         bitmap.set(p -> GetNumber());
00865                 }
00866                 for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00867                 {
00868                         Peer *peer = *it;
00869                         pSocket *sock = peer -> PeerSocket();
00870                         if (sock)
00871                         {
00872                                 if (sock -> CTS())
00873                                 {
00874                                         if (!peer -> Score(&bitmap))
00875                                         {
00876 DEB(printf("Kicking peer with score 0: %s\n", peer -> GetIP().c_str());)
00877                                                 sock -> SetCloseAndDelete();
00878                                                 peer -> SetFailed();
00879                                         }
00880                                 }
00881                         }
00882                 }
00883 */
00884                 double complete = (double)m_complete.size();
00885                 double incomplete = (double)m_incomplete.size();
00886 DEB(printf("Progress: %.2f%%\n", complete * 100 / (complete + incomplete));)
00887         }
00888 DEB(    if (debug_time) ck.PrintDiff("9");)
00889         // get number of downloaders
00890         if (time(NULL) % 5 == 0)
00891         {
00892                 size_t qd = 0;
00893                 std::vector<Peer *> interested_choked;
00894                 for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00895                 {
00896                         Peer *peer = *it;
00897                         pSocket *sock = peer -> PeerSocket();
00898                         if (sock)
00899                         {
00900                                 if (peer -> IsInterested() && sock -> Choked())
00901                                 {
00902                                         interested_choked.push_back(peer);
00903                                 }
00904                                 if (!sock -> Choked())
00905                                 {
00906                                         qd++;
00907                                 }
00908                         }
00909                 }
00910                 q = interested_choked.size();
00911                 if (qd < ref.GetDownloaders() + ref.GetOptimistic() && q > 0)
00912                 {
00913                         Peer *peer = interested_choked[random() % q];
00914                         pSocket *sock = peer -> PeerSocket();
00915                         if (sock)
00916                         {
00917                                 sock -> SendChoke(false);
00918                         }
00919                 }
00920         }
00921 DEB(    if (debug_time) ck.PrintDiff("0");)
00922         // calculate up/down speed
00923         if (ref.GetDebug() & 4096)
00924         {
00925                 size_t r = 0;
00926                 size_t w = 0;
00927                 for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00928                 {
00929                         Peer *peer = *it;
00930                         if (peer)
00931                         {
00932                                 pSocket *sock = peer -> PeerSocket();
00933                                 if (sock)
00934                                 {
00935                                         r += sock -> GetBytesR();
00936                                         w += sock -> GetBytesW();
00937                                 }
00938                         }
00939                 }
00940                 printf("Down: %d.%03d kB/sec  Up: %d.%03d kB/sec\n",
00941                         r / 1024, r % 1024,
00942                         w / 1024, w % 1024);
00943         }
00944         // use OnWriteComplete to send next piece to each pSocket
00945         // send "unsent" slices first of all
00946 
00947         // check for all complete
00948         if (!m_incomplete.size())
00949         {
00950 DEB(printf("All complete: %s\n", m_info_hash.c_str());)
00951         }
00952         // kick downloader with lowest d/l rate
00953         if (time(NULL) % 30 == 0)
00954         {
00955                 ref.CheckDownloadRate();
00956         }
00957 }
00958 
00959 
00960 /*
00961 size_t Session::GetInstances(size_t piece)
00962 {
00963         size_t q = 0;
00964         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
00965         {
00966                 Peer *p = *it;
00967                 pSocket *sock = p -> PeerSocket();
00968                 if (sock && sock -> CTS() && p -> IsSet(piece))
00969                 {
00970                         q++;
00971                 }
00972         }
00973         return q;
00974 }
00975 */
00976 
00977 
00978 void Session::Status(TcpSocket *p) 
00979 {
00980         p -> Send("+---------------------------------------------------------------------<br>");
00981         p -> Sendf(" Hash             : %s<br>", m_info_hash.c_str());
00982         p -> Sendf(" Announce         : %s<br>", m_announce.c_str());
00983         p -> Sendf(" Name             : %s<br>", m_name.c_str());
00984         p -> Sendf("<table><tr>"
00985                 "<td class=h align='center'>Files</td>"
00986                 "<td class=h align='center'>Piece length</td>"
00987                 "<td class=h align='center'>Total length</td>"
00988                 "<td class=h align='center'>Number of pieces</td>"
00989                 "<td class=h align='center'>Complete pieces</td>"
00990                 "<td class=h align='center'>Last length</td>"
00991                 "<td class=h align='center'>Pieces length</td>"
00992                 "</tr>"
00993                 "<tr>");
00994         p -> Sendf("<td class=h align='center'>%d</td>", m_files.size());
00995         p -> Sendf("<td class=h align='center'>%lld</td>", m_piece_length);
00996         p -> Sendf("<td class=h align='center'>%lld</td>", m_length);
00997         p -> Sendf("<td class=h align='center'>%d</td>", m_number_of_pieces);
00998         p -> Sendf("<td class=h align='center'>%d</td>", m_complete.size());
00999         p -> Sendf("<td class=h align='center'>%d</td>", m_last_length);
01000         p -> Sendf("<td class=h align='center'>%d (%d pieces)</td>", m_pieces.size(), m_pieces.size() / 20);
01001         p -> Sendf("</tr></table>");
01002         p -> Send("+---------------------------------------------------------------------<br>");
01003 
01004         std::map<size_t,piece_v> mmap;
01005         for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end(); it++)
01006         {
01007                 Piece *p0 = *it;
01008                 size_t q = p0 -> NumberComplete();
01009                 if (q)
01010                         mmap[q].push_back(p0);
01011         }
01012         piece_v tmp;
01013         {
01014                 for (std::map<size_t,piece_v>::iterator it = mmap.begin(); it != mmap.end(); it++)
01015                 {
01016                         size_t q = (*it).first;
01017                         for (piece_v::iterator it = mmap[q].begin(); it != mmap[q].end(); it++)
01018                         {
01019                                 Piece *p0 = *it;
01020                                 tmp.insert(tmp.begin(), p0);
01021                         }
01022                 }
01023         }
01024         if (tmp.size())
01025         {
01026                 p -> Sendf("<table><tr>"
01027                         "<td class=h>Piece</td>"
01028                         "<td class=h>Complete</td>"
01029                         "<td class=h>Requested</td>"
01030                         "<td class=h>Availability</td>"
01031                         "</tr>");
01032                 for (piece_v::iterator it = tmp.begin(); it != tmp.end(); it++)
01033                 {
01034                         Piece *p0 = *it;
01035                         size_t q = p0 -> NumberComplete();
01036                         size_t req = p0 -> NumberRequested();
01037                         size_t avail = Available(p0 -> GetNumber());
01038                         if (avail || req)
01039                         {
01040                                 p -> Sendf("<tr><td class=h>%d</td><td class=h>%d</td><td class=h>%d</td><td class=h>%d</td></tr>",
01041                                         p0 -> GetNumber(), q, req, avail );
01042                         }
01043                 }
01044                 p -> Send("</table>");
01045         }
01046 }
01047 
01048 
01049 size_t Session::Available(int piece)
01050 {
01051         size_t q = 0;
01052         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
01053         {
01054                 Peer *peer = *it;
01055                 if (peer -> IsSet(piece) && !peer -> IsChoked())
01056                         q++;
01057         }
01058         return q;
01059 }
01060 
01061 
01062 void Session::RequestAvailable(Peer *peer)
01063 {
01064         request_v& reqs = peer -> Requests();
01065         size_t ant = GetPieceLength() / SLICE;
01066         int q = (ant * 3) / 2; // requests to add
01067         // add requests for most complete
01068         if (q > 0)
01069         {
01070                 std::map<size_t,piece_v> mmap;
01071                 for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end() && q > 0; it++)
01072                 {
01073                         Piece *p = *it;
01074                         if (peer -> IsSet(p -> GetNumber()) && !p -> AllRequested())
01075                         {
01076                                 mmap[p -> NumberComplete()].push_back(p);
01077                         }
01078                 }
01079                 piece_v tmp;
01080                 {
01081                         for (std::map<size_t,piece_v>::iterator it = mmap.begin(); it != mmap.end(); it++)
01082                         {
01083                                 piece_v& ref = (*it).second;
01084                                 for (piece_v::iterator it2 = ref.begin(); it2 != ref.end(); it2++)
01085                                 {
01086                                         Piece *p = *it2;
01087                                         tmp.insert(tmp.begin(), p);
01088                                 }
01089                         }
01090                 }
01091                 {
01092                         for (piece_v::iterator it = tmp.begin(); it != tmp.end() && q > 0; it++)
01093                         {
01094                                 Piece *p = *it;
01095                                 int max = 20;
01096                                 while (peer -> IsSet(p -> GetNumber()) && !p -> AllRequested() && max-- && q--)
01097                                 {
01098                                         size_t offset;
01099                                         size_t length;
01100                                         if (p -> GetRandomNotRequested(offset, length))
01101                                         {
01102                                                 Request *r = AddRequest(peer, p -> GetNumber(), offset, length);
01103                                                 if (r)
01104                                                 {
01105                                                         reqs.push_back(r);
01106                                                 }
01107                                         }
01108                                         else
01109                                         {
01110                                                 printf(" --------> GetRandomNotRequested failed\n");
01111                                         }
01112                                 }
01113                         }
01114                 }
01115         }
01116         // add requests for least available
01117         if (q > 0)
01118         {
01119                 std::map<size_t,piece_v> mmap;
01120                 for (piece_v::iterator it = m_incomplete.begin(); it != m_incomplete.end() && q > 0; it++)
01121                 {
01122                         Piece *p = *it;
01123                         if (peer -> IsSet(p -> GetNumber()) && !p -> AllRequested())
01124                         {
01125                                 mmap[Available(p -> GetNumber())].push_back(p);
01126                         }
01127                 }
01128                 piece_v tmp;
01129                 {
01130                         for (std::map<size_t,piece_v>::iterator it = mmap.begin(); it != mmap.end(); it++)
01131                         {
01132                                 piece_v& ref = (*it).second;
01133                                 for (piece_v::iterator it2 = ref.begin(); it2 != ref.end(); it2++)
01134                                 {
01135                                         Piece *p = *it2;
01136                                         tmp.push_back(p);
01137                                 }
01138                         }
01139                 }
01140                 {
01141                         for (piece_v::iterator it = tmp.begin(); it != tmp.end() && q > 0; it++)
01142                         {
01143                                 Piece *p = *it;
01144                                 int max = 20;
01145                                 while (peer -> IsSet(p -> GetNumber()) && !p -> AllRequested() && max-- && q--)
01146                                 {
01147                                         size_t offset;
01148                                         size_t length;
01149                                         if (p -> GetRandomNotRequested(offset, length))
01150                                         {
01151                                                 Request *r = AddRequest(peer, p -> GetNumber(), offset, length);
01152                                                 if (r)
01153                                                 {
01154                                                         reqs.push_back(r);
01155                                                 }
01156                                         }
01157                                         else
01158                                         {
01159                                                 printf(" --------> GetRandomNotRequested failed\n");
01160                                         }
01161                                 }
01162                         }
01163                 }
01164         }
01165         // add random requests
01166         if (q > 0)
01167         {
01168                 int max = 20;
01169                 while (q-- && max--)
01170                 {
01171                         GenerateRequest(peer);
01172                 }
01173         }
01174 }
01175 
01176 
01177 void Session::RemoveRequests(Peer *peer)
01178 {
01179         request_v& reqs = peer -> Requests();
01180         while (reqs.size())
01181         {
01182                 request_v::iterator it = reqs.begin();
01183                 Request *r = *it;
01184                 size_t piece = r -> GetPiece();
01185                 size_t offset = r -> GetOffset();
01186                 size_t length = r -> GetLength();
01187                 RemoveRequest(peer, piece, offset, length);
01188                 delete r;
01189                 reqs.erase(it);
01190         }
01191 }
01192 
01193 
01194 void Session::CheckRequests()
01195 {
01196         for (peer_v::iterator it = m_peers.begin(); it != m_peers.end(); it++)
01197         {
01198                 Peer *peer = *it;
01199                 request_v& reqs = peer -> Requests();
01200                 for (request_v::iterator it = reqs.begin(); it != reqs.end(); it++)
01201                 {
01202                         Request *r = *it;
01203                         if (r -> Age() > static_cast<PeerHandler&>(Handler()).MaxRequestAge() )
01204                         {
01205                                 pSocket *sock = peer -> PeerSocket();
01206                                 if (sock)
01207                                         sock -> SetCloseAndDelete();
01208 /*
01209                                 size_t piece = r -> GetPiece();
01210                                 size_t offset = r -> GetOffset();
01211                                 size_t length = r -> GetLength();
01212                                 RemoveRequest(peer, piece, offset, length);
01213                                 delete r;
01214                                 reqs.erase(it);
01215                                 break;
01216 */
01217                         }
01218                 }
01219         }
01220 }
01221 
01222 
Page, code, and content Copyright (C) 2006 by Anders Hedström
Generated on Mon Aug 29 20:21:47 2005 for C++ Sockets by  doxygen 1.4.4