LCOV - code coverage report
Current view: top level - src/network - transportsender-impl.h (source / functions) Hit Total Coverage
Test: mosh-1.3.2 Code Coverage Lines: 187 192 97.4 %
Date: 2022-02-06 20:19:53 Functions: 28 29 96.6 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :     Mosh: the mobile shell
       3             :     Copyright 2012 Keith Winstein
       4             : 
       5             :     This program is free software: you can redistribute it and/or modify
       6             :     it under the terms of the GNU General Public License as published by
       7             :     the Free Software Foundation, either version 3 of the License, or
       8             :     (at your option) any later version.
       9             : 
      10             :     This program is distributed in the hope that it will be useful,
      11             :     but WITHOUT ANY WARRANTY; without even the implied warranty of
      12             :     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
      13             :     GNU General Public License for more details.
      14             : 
      15             :     You should have received a copy of the GNU General Public License
      16             :     along with this program.  If not, see <http://www.gnu.org/licenses/>.
      17             : 
      18             :     In addition, as a special exception, the copyright holders give
      19             :     permission to link the code of portions of this program with the
      20             :     OpenSSL library under certain conditions as described in each
      21             :     individual source file, and distribute linked combinations including
      22             :     the two.
      23             : 
      24             :     You must obey the GNU General Public License in all respects for all
      25             :     of the code used other than OpenSSL. If you modify file(s) with this
      26             :     exception, you may extend this exception to your version of the
      27             :     file(s), but you are not obligated to do so. If you do not wish to do
      28             :     so, delete this exception statement from your version. If you delete
      29             :     this exception statement from all source files in the program, then
      30             :     also delete it here.
      31             : */
      32             : 
      33             : #ifndef TRANSPORT_SENDER_IMPL_HPP
      34             : #define TRANSPORT_SENDER_IMPL_HPP
      35             : 
      36             : #include <algorithm>
      37             : #include <list>
      38             : #include <stdio.h>
      39             : #include <stdlib.h>
      40             : #include <time.h>
      41             : 
      42             : #include "transportsender.h"
      43             : #include "transportfragment.h"
      44             : 
      45             : #include <limits.h>
      46             : 
      47             : using namespace Network;
      48             : 
      49             : template <class MyState>
      50         900 : TransportSender<MyState>::TransportSender( Connection *s_connection, MyState &initial_state )
      51         900 :   : connection( s_connection ), 
      52         900 :     current_state( initial_state ),
      53        2700 :     sent_states( 1, TimestampedState<MyState>( timestamp(), 0, initial_state ) ),
      54         900 :     assumed_receiver_state( sent_states.begin() ),
      55         900 :     fragmenter(),
      56         900 :     next_ack_time( timestamp() ),
      57         900 :     next_send_time( timestamp() ),
      58         900 :     verbose( 0 ),
      59         900 :     shutdown_in_progress( false ),
      60         900 :     shutdown_tries( 0 ),
      61         900 :     shutdown_start( -1 ),
      62         900 :     ack_num( 0 ),
      63         900 :     pending_data_ack( false ),
      64         900 :     SEND_MINDELAY( 8 ),
      65         900 :     last_heard( 0 ),
      66         900 :     prng(),
      67         900 :     mindelay_clock( -1 )
      68             : {
      69         900 : }
      70             : 
      71             : /* Try to send roughly two frames per RTT, bounded by limits on frame rate */
      72             : template <class MyState>
      73       60984 : unsigned int TransportSender<MyState>::send_interval( void ) const
      74             : {
      75       60984 :   int SEND_INTERVAL = lrint( ceil( connection->get_SRTT() / 2.0 ) );
      76             :   if ( SEND_INTERVAL < SEND_INTERVAL_MIN ) {
      77             :     SEND_INTERVAL = SEND_INTERVAL_MIN;
      78             :   } else if ( SEND_INTERVAL > SEND_INTERVAL_MAX ) {
      79             :     SEND_INTERVAL = SEND_INTERVAL_MAX;
      80             :   }
      81             : 
      82       60984 :   return SEND_INTERVAL;
      83             : }
      84             : 
      85             : /* Housekeeping routine to calculate next send and ack times */
      86             : template <class MyState>
      87       65364 : void TransportSender<MyState>::calculate_timers( void )
      88             : {
      89       65364 :   uint64_t now = timestamp();
      90             : 
      91             :   /* Update assumed receiver state */
      92       65364 :   update_assumed_receiver_state();
      93             : 
      94             :   /* Cut out common prefix of all states */
      95       65364 :   rationalize_states();
      96             : 
      97       65364 :   if ( pending_data_ack && (next_ack_time > now + ACK_DELAY) ) {
      98        1724 :     next_ack_time = now + ACK_DELAY;
      99             :   }
     100             : 
     101       65364 :   if ( !(current_state == sent_states.back().state) ) {
     102       50680 :     if ( mindelay_clock == uint64_t( -1 ) ) {
     103        5660 :       mindelay_clock = now;
     104             :     }
     105             : 
     106      101360 :     next_send_time = std::max( mindelay_clock + SEND_MINDELAY,
     107       92850 :                                sent_states.back().timestamp + send_interval() );
     108       14684 :   } else if ( !(current_state == assumed_receiver_state->state)
     109       14684 :               && (last_heard + ACTIVE_RETRY_TIMEOUT > now) ) {
     110          30 :     next_send_time = sent_states.back().timestamp + send_interval();
     111          30 :     if ( mindelay_clock != uint64_t( -1 ) ) {
     112           0 :       next_send_time = std::max( next_send_time, mindelay_clock + SEND_MINDELAY );
     113             :     }
     114       14654 :   } else if ( !(current_state == sent_states.front().state )
     115       14654 :               && (last_heard + ACTIVE_RETRY_TIMEOUT > now) ) {
     116        3552 :     next_send_time = sent_states.back().timestamp + connection->timeout() + ACK_DELAY;
     117             :   } else {
     118       11102 :     next_send_time = uint64_t(-1);
     119             :   }
     120             : 
     121             :   /* speed up shutdown sequence */
     122       65364 :   if ( shutdown_in_progress || (ack_num == uint64_t(-1)) ) {
     123        3556 :     next_ack_time = sent_states.back().timestamp + send_interval();
     124             :   }
     125       65364 : }
     126             : 
     127             : /* How many ms to wait until next event */
     128             : template <class MyState>
     129       33132 : int TransportSender<MyState>::wait_time( void )
     130             : {
     131       33132 :   calculate_timers();
     132             : 
     133       33132 :   uint64_t next_wakeup = next_ack_time;
     134       33132 :   if ( next_send_time < next_wakeup ) {
     135             :     next_wakeup = next_send_time;
     136             :   }
     137             : 
     138       33132 :   uint64_t now = timestamp();
     139             : 
     140       33132 :   if ( !connection->get_has_remote_addr() ) {
     141             :     return INT_MAX;
     142             :   }
     143             : 
     144       32680 :   if ( next_wakeup > now ) {
     145       32232 :     return next_wakeup - now;
     146             :   } else {
     147             :     return 0;
     148             :   }
     149             : }
     150             : 
     151             : /* Send data or an empty ack if necessary */
     152             : template <class MyState>
     153       32232 : void TransportSender<MyState>::tick( void )
     154             : {
     155       32232 :   calculate_timers(); /* updates assumed receiver state and rationalizes */
     156             : 
     157       32232 :   if ( !connection->get_has_remote_addr() ) {
     158       24907 :     return;
     159             :   }
     160             : 
     161       32232 :   uint64_t now = timestamp();
     162             : 
     163       32232 :   if ( (now < next_ack_time)
     164       29672 :        && (now < next_send_time) ) {
     165             :     return;
     166             :   }
     167             : 
     168             :   /* Determine if a new diff or empty ack needs to be sent */
     169             :     
     170        7325 :   string diff = current_state.diff_from( assumed_receiver_state->state );
     171             : 
     172        7325 :   attempt_prospective_resend_optimization( diff );
     173             : 
     174        7325 :   if ( verbose ) {
     175             :     /* verify diff has round-trip identity (modulo Unicode fallback rendering) */
     176        5377 :     MyState newstate( assumed_receiver_state->state );
     177        5377 :     newstate.apply_string( diff );
     178        5377 :     if ( current_state.compare( newstate ) ) {
     179           0 :       fprintf( stderr, "Warning, round-trip Instruction verification failed!\n" );
     180             :     }
     181             :     /* Also verify that both the original frame and generated frame have the same initial diff. */
     182        5377 :     std::string current_diff( current_state.init_diff() );
     183        5377 :     std::string new_diff( newstate.init_diff() );
     184        5377 :     if ( current_diff != new_diff ) {
     185        5377 :       fprintf( stderr, "Warning, target state Instruction verification failed!\n" );
     186             :     }
     187        9745 :   }
     188             : 
     189        7325 :   if ( diff.empty() ) {
     190        3494 :     if ( (now >= next_ack_time) ) {
     191        1498 :       send_empty_ack();
     192        1498 :       mindelay_clock = uint64_t( -1 );
     193             :     }
     194        3494 :     if ( (now >= next_send_time) ) {
     195        1996 :       next_send_time = uint64_t( -1 );
     196        1996 :       mindelay_clock = uint64_t( -1 );
     197             :     }
     198        3831 :   } else if ( (now >= next_send_time) || (now >= next_ack_time) ) {
     199             :     /* Send diffs or ack */
     200        3831 :     send_to_receiver( diff );
     201        3831 :     mindelay_clock = uint64_t( -1 );
     202             :   }
     203        7325 : }
     204             : 
     205             : template <class MyState>
     206        1498 : void TransportSender<MyState>::send_empty_ack( void )
     207             : {
     208        1498 :   uint64_t now = timestamp();
     209             : 
     210        1498 :   assert( now >= next_ack_time );
     211             : 
     212        1498 :   uint64_t new_num = sent_states.back().num + 1;
     213             : 
     214             :   /* special case for shutdown sequence */
     215        1498 :   if ( shutdown_in_progress ) {
     216          34 :     new_num = uint64_t( -1 );
     217             :   }
     218             : 
     219             :   //  sent_states.push_back( TimestampedState<MyState>( sent_states.back().timestamp, new_num, current_state ) );
     220        1498 :   add_sent_state( now, new_num, current_state );
     221        1498 :   send_in_fragments( "", new_num );
     222             : 
     223        1498 :   next_ack_time = now + ACK_INTERVAL;
     224        1498 :   next_send_time = uint64_t(-1);
     225        1498 : }
     226             : 
     227             : template <class MyState>
     228        5157 : void TransportSender<MyState>::add_sent_state( uint64_t the_timestamp, uint64_t num, MyState &state )
     229             : {
     230        5157 :   sent_states.push_back( TimestampedState<MyState>( the_timestamp, num, state ) );
     231        5157 :   if ( sent_states.size() > 32 ) { /* limit on state queue */
     232             :     typename sent_states_type::iterator last = sent_states.end();
     233           0 :     for ( int i = 0; i < 16; i++ ) { last--; }
     234           0 :     sent_states.erase( last ); /* erase state from middle of queue */
     235             :   }
     236        5157 : }
     237             : 
     238             : template <class MyState>
     239        3831 : void TransportSender<MyState>::send_to_receiver( const string & diff )
     240             : {
     241             :   uint64_t new_num;
     242        3831 :   if ( current_state == sent_states.back().state ) { /* previously sent */
     243         173 :     new_num = sent_states.back().num;
     244             :   } else { /* new state */
     245        3658 :     new_num = sent_states.back().num + 1;
     246             :   }
     247             : 
     248             :   /* special case for shutdown sequence */
     249        3831 :   if ( shutdown_in_progress ) {
     250         614 :     new_num = uint64_t( -1 );
     251             :   }
     252             : 
     253        3831 :   if ( new_num == sent_states.back().num ) {
     254         172 :     sent_states.back().timestamp = timestamp();
     255             :   } else {
     256        3659 :     add_sent_state( timestamp(), new_num, current_state );
     257             :   }
     258             : 
     259        3831 :   send_in_fragments( diff, new_num ); // Can throw NetworkException
     260             : 
     261             :   /* successfully sent, probably */
     262             :   /* ("probably" because the FIRST size-exceeded datagram doesn't get an error) */
     263        3831 :   assumed_receiver_state = sent_states.end();
     264        3831 :   assumed_receiver_state--;
     265        3831 :   next_ack_time = timestamp() + ACK_INTERVAL;
     266        3831 :   next_send_time = uint64_t(-1);
     267        3831 : }
     268             : 
     269             : template <class MyState>
     270       65364 : void TransportSender<MyState>::update_assumed_receiver_state( void )
     271             : {
     272       65364 :   uint64_t now = timestamp();
     273             : 
     274             :   /* start from what is known and give benefit of the doubt to unacknowledged states
     275             :      transmitted recently enough ago */
     276       65364 :   assumed_receiver_state = sent_states.begin();
     277             : 
     278       65364 :   typename list< TimestampedState<MyState> >::iterator i = sent_states.begin();
     279      185351 :   i++;
     280             : 
     281      185351 :   while ( i != sent_states.end() ) {
     282      120159 :     assert( now >= i->timestamp );
     283             : 
     284      120159 :     if ( uint64_t(now - i->timestamp) < connection->timeout() + ACK_DELAY ) {
     285      119987 :       assumed_receiver_state = i;
     286             :     } else {
     287       65364 :       return;
     288             :     }
     289             : 
     290      185351 :     i++;
     291             :   }
     292             : }
     293             : 
     294             : template <class MyState>
     295       65364 : void TransportSender<MyState>::rationalize_states( void )
     296             : {
     297       65364 :   const MyState * known_receiver_state = &sent_states.front().state;
     298             : 
     299       11728 :   current_state.subtract( known_receiver_state );
     300             : 
     301       79678 :   for ( typename list< TimestampedState<MyState> >::reverse_iterator i = sent_states.rbegin();
     302       79678 :         i != sent_states.rend();
     303             :         i++ ) {
     304       14314 :     i->state.subtract( known_receiver_state );
     305             :   }
     306       11728 : }
     307             : 
     308             : template <class MyState>
     309        5329 : const string TransportSender<MyState>::make_chaff( void )
     310             : {
     311        5329 :   const size_t CHAFF_MAX = 16;
     312        5329 :   const size_t chaff_len = prng.uint8() % (CHAFF_MAX + 1);
     313             : 
     314             :   char chaff[ CHAFF_MAX ];
     315        5329 :   prng.fill( chaff, chaff_len );
     316        5329 :   return string( chaff, chaff_len );
     317             : }
     318             : 
     319             : template <class MyState>
     320        5329 : void TransportSender<MyState>::send_in_fragments( const string & diff, uint64_t new_num )
     321             : {
     322        5329 :   Instruction inst;
     323             : 
     324        5329 :   inst.set_protocol_version( MOSH_PROTOCOL_VERSION );
     325        5329 :   inst.set_old_num( assumed_receiver_state->num );
     326        5329 :   inst.set_new_num( new_num );
     327        5329 :   inst.set_ack_num( ack_num );
     328        5329 :   inst.set_throwaway_num( sent_states.front().num );
     329        5329 :   inst.set_diff( diff );
     330       10658 :   inst.set_chaff( make_chaff() );
     331             : 
     332        5329 :   if ( new_num == uint64_t(-1) ) {
     333         648 :     shutdown_tries++;
     334             :   }
     335             : 
     336        5329 :   vector<Fragment> fragments = fragmenter.make_fragments( inst, connection->get_MTU()
     337             :                                                           - Network::Connection::ADDED_BYTES
     338        5329 :                                                           - Crypto::Session::ADDED_BYTES );
     339        5329 :   for ( vector<Fragment>::iterator i = fragments.begin();
     340       10662 :         i != fragments.end();
     341       10662 :         i++ ) {
     342        5333 :     connection->send( i->tostring() );
     343             : 
     344        5333 :     if ( verbose ) {
     345        8718 :       fprintf( stderr, "[%u] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d, srtt=%.1f\n",
     346        3385 :                (unsigned int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)i->id, (int)i->fragment_num,
     347        3385 :                (int)inst.ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(),
     348        3385 :                1000.0 / (double)send_interval(),
     349        3385 :                (int)connection->timeout(), connection->get_SRTT() );
     350             :     }
     351             : 
     352             :   }
     353             : 
     354        5329 :   pending_data_ack = false;
     355        5329 : }
     356             : 
     357             : template <class MyState>
     358        5281 : void TransportSender<MyState>::process_acknowledgment_through( uint64_t ack_num )
     359             : {
     360             :   /* Ignore ack if we have culled the state it's acknowledging */
     361             : 
     362        5281 :   typename sent_states_type::iterator i;
     363        9942 :   for ( i = sent_states.begin(); i != sent_states.end(); i++ ) {
     364        9942 :     if ( i->num == ack_num ) {
     365             :       break;
     366             :     }
     367             :   }
     368             : 
     369        5281 :   if ( i != sent_states.end() ) {
     370       15548 :     for ( i = sent_states.begin(); i != sent_states.end(); ) {
     371       10267 :       typename sent_states_type::iterator i_next = i;
     372       10267 :       i_next++;
     373       10267 :       if ( i->num < ack_num ) {
     374        4661 :         sent_states.erase( i );
     375             :       }
     376             :       i = i_next;
     377             :     }
     378             :   }
     379        5281 :   assert( !sent_states.empty() );
     380        5281 : }
     381             : 
     382             : /* give up on getting acknowledgement for shutdown */
     383             : template <class MyState>
     384        1025 : bool TransportSender<MyState>::shutdown_ack_timed_out( void ) const
     385             : {
     386        1025 :   if ( shutdown_in_progress ) {
     387        1025 :     if ( shutdown_tries >= SHUTDOWN_RETRIES ) {
     388             :       return true;
     389        1023 :     } else if ( timestamp() - shutdown_start >= uint64_t( ACTIVE_RETRY_TIMEOUT ) ) {
     390           0 :       return true;
     391             :     }
     392             :   }
     393             : 
     394             :   return false;
     395             : }
     396             : 
     397             : /* Executed upon entry to new receiver state */
     398             : template <class MyState>
     399        5109 : void TransportSender<MyState>::set_ack_num( uint64_t s_ack_num )
     400             : {
     401        5109 :   ack_num = s_ack_num;
     402             : }
     403             : 
     404             : /* Investigate diff against known receiver state instead */
     405             : /* Mutates proposed_diff */
     406             : template <class MyState>
     407        7325 : void TransportSender<MyState>::attempt_prospective_resend_optimization( string &proposed_diff )
     408             : {
     409        7325 :   if ( assumed_receiver_state == sent_states.begin() ) {
     410        5222 :     return;
     411             :   }
     412             : 
     413        2103 :   string resend_diff = current_state.diff_from( sent_states.front().state );
     414             : 
     415             :   /* We do a prophylactic resend if it would make the diff shorter,
     416             :      or if it would lengthen it by no more than 100 bytes and still be
     417             :      less than 1000 bytes. */
     418             : 
     419        2103 :   if ( (resend_diff.size() <= proposed_diff.size())
     420        2103 :        || ( (resend_diff.size() < 1000)
     421         634 :             && (resend_diff.size() - proposed_diff.size() < 100) ) ) {
     422        2055 :     assumed_receiver_state = sent_states.begin();
     423        2103 :     proposed_diff = resend_diff;
     424             :   }
     425        2103 : }
     426             : 
     427             : #endif

Generated by: LCOV version 1.14