Line data Source code
1 : /*
2 : Mosh: the mobile shell
3 : Copyright 2012 Keith Winstein
4 :
5 : This program is free software: you can redistribute it and/or modify
6 : it under the terms of the GNU General Public License as published by
7 : the Free Software Foundation, either version 3 of the License, or
8 : (at your option) any later version.
9 :
10 : This program is distributed in the hope that it will be useful,
11 : but WITHOUT ANY WARRANTY; without even the implied warranty of
12 : MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 : GNU General Public License for more details.
14 :
15 : You should have received a copy of the GNU General Public License
16 : along with this program. If not, see <http://www.gnu.org/licenses/>.
17 :
18 : In addition, as a special exception, the copyright holders give
19 : permission to link the code of portions of this program with the
20 : OpenSSL library under certain conditions as described in each
21 : individual source file, and distribute linked combinations including
22 : the two.
23 :
24 : You must obey the GNU General Public License in all respects for all
25 : of the code used other than OpenSSL. If you modify file(s) with this
26 : exception, you may extend this exception to your version of the
27 : file(s), but you are not obligated to do so. If you do not wish to do
28 : so, delete this exception statement from your version. If you delete
29 : this exception statement from all source files in the program, then
30 : also delete it here.
31 : */
32 :
33 : #ifndef TRANSPORT_SENDER_IMPL_HPP
34 : #define TRANSPORT_SENDER_IMPL_HPP
35 :
36 : #include <algorithm>
37 : #include <list>
38 : #include <stdio.h>
39 : #include <stdlib.h>
40 : #include <time.h>
41 :
42 : #include "transportsender.h"
43 : #include "transportfragment.h"
44 :
45 : #include <limits.h>
46 :
47 : using namespace Network;
48 :
49 : template <class MyState>
50 900 : TransportSender<MyState>::TransportSender( Connection *s_connection, MyState &initial_state )
51 900 : : connection( s_connection ),
52 900 : current_state( initial_state ),
53 2700 : sent_states( 1, TimestampedState<MyState>( timestamp(), 0, initial_state ) ),
54 900 : assumed_receiver_state( sent_states.begin() ),
55 900 : fragmenter(),
56 900 : next_ack_time( timestamp() ),
57 900 : next_send_time( timestamp() ),
58 900 : verbose( 0 ),
59 900 : shutdown_in_progress( false ),
60 900 : shutdown_tries( 0 ),
61 900 : shutdown_start( -1 ),
62 900 : ack_num( 0 ),
63 900 : pending_data_ack( false ),
64 900 : SEND_MINDELAY( 8 ),
65 900 : last_heard( 0 ),
66 900 : prng(),
67 900 : mindelay_clock( -1 )
68 : {
69 900 : }
70 :
71 : /* Try to send roughly two frames per RTT, bounded by limits on frame rate */
72 : template <class MyState>
73 60984 : unsigned int TransportSender<MyState>::send_interval( void ) const
74 : {
75 60984 : int SEND_INTERVAL = lrint( ceil( connection->get_SRTT() / 2.0 ) );
76 : if ( SEND_INTERVAL < SEND_INTERVAL_MIN ) {
77 : SEND_INTERVAL = SEND_INTERVAL_MIN;
78 : } else if ( SEND_INTERVAL > SEND_INTERVAL_MAX ) {
79 : SEND_INTERVAL = SEND_INTERVAL_MAX;
80 : }
81 :
82 60984 : return SEND_INTERVAL;
83 : }
84 :
85 : /* Housekeeping routine to calculate next send and ack times */
86 : template <class MyState>
87 65364 : void TransportSender<MyState>::calculate_timers( void )
88 : {
89 65364 : uint64_t now = timestamp();
90 :
91 : /* Update assumed receiver state */
92 65364 : update_assumed_receiver_state();
93 :
94 : /* Cut out common prefix of all states */
95 65364 : rationalize_states();
96 :
97 65364 : if ( pending_data_ack && (next_ack_time > now + ACK_DELAY) ) {
98 1724 : next_ack_time = now + ACK_DELAY;
99 : }
100 :
101 65364 : if ( !(current_state == sent_states.back().state) ) {
102 50680 : if ( mindelay_clock == uint64_t( -1 ) ) {
103 5660 : mindelay_clock = now;
104 : }
105 :
106 101360 : next_send_time = std::max( mindelay_clock + SEND_MINDELAY,
107 92850 : sent_states.back().timestamp + send_interval() );
108 14684 : } else if ( !(current_state == assumed_receiver_state->state)
109 14684 : && (last_heard + ACTIVE_RETRY_TIMEOUT > now) ) {
110 30 : next_send_time = sent_states.back().timestamp + send_interval();
111 30 : if ( mindelay_clock != uint64_t( -1 ) ) {
112 0 : next_send_time = std::max( next_send_time, mindelay_clock + SEND_MINDELAY );
113 : }
114 14654 : } else if ( !(current_state == sent_states.front().state )
115 14654 : && (last_heard + ACTIVE_RETRY_TIMEOUT > now) ) {
116 3552 : next_send_time = sent_states.back().timestamp + connection->timeout() + ACK_DELAY;
117 : } else {
118 11102 : next_send_time = uint64_t(-1);
119 : }
120 :
121 : /* speed up shutdown sequence */
122 65364 : if ( shutdown_in_progress || (ack_num == uint64_t(-1)) ) {
123 3556 : next_ack_time = sent_states.back().timestamp + send_interval();
124 : }
125 65364 : }
126 :
127 : /* How many ms to wait until next event */
128 : template <class MyState>
129 33132 : int TransportSender<MyState>::wait_time( void )
130 : {
131 33132 : calculate_timers();
132 :
133 33132 : uint64_t next_wakeup = next_ack_time;
134 33132 : if ( next_send_time < next_wakeup ) {
135 : next_wakeup = next_send_time;
136 : }
137 :
138 33132 : uint64_t now = timestamp();
139 :
140 33132 : if ( !connection->get_has_remote_addr() ) {
141 : return INT_MAX;
142 : }
143 :
144 32680 : if ( next_wakeup > now ) {
145 32232 : return next_wakeup - now;
146 : } else {
147 : return 0;
148 : }
149 : }
150 :
151 : /* Send data or an empty ack if necessary */
152 : template <class MyState>
153 32232 : void TransportSender<MyState>::tick( void )
154 : {
155 32232 : calculate_timers(); /* updates assumed receiver state and rationalizes */
156 :
157 32232 : if ( !connection->get_has_remote_addr() ) {
158 24907 : return;
159 : }
160 :
161 32232 : uint64_t now = timestamp();
162 :
163 32232 : if ( (now < next_ack_time)
164 29672 : && (now < next_send_time) ) {
165 : return;
166 : }
167 :
168 : /* Determine if a new diff or empty ack needs to be sent */
169 :
170 7325 : string diff = current_state.diff_from( assumed_receiver_state->state );
171 :
172 7325 : attempt_prospective_resend_optimization( diff );
173 :
174 7325 : if ( verbose ) {
175 : /* verify diff has round-trip identity (modulo Unicode fallback rendering) */
176 5377 : MyState newstate( assumed_receiver_state->state );
177 5377 : newstate.apply_string( diff );
178 5377 : if ( current_state.compare( newstate ) ) {
179 0 : fprintf( stderr, "Warning, round-trip Instruction verification failed!\n" );
180 : }
181 : /* Also verify that both the original frame and generated frame have the same initial diff. */
182 5377 : std::string current_diff( current_state.init_diff() );
183 5377 : std::string new_diff( newstate.init_diff() );
184 5377 : if ( current_diff != new_diff ) {
185 5377 : fprintf( stderr, "Warning, target state Instruction verification failed!\n" );
186 : }
187 9745 : }
188 :
189 7325 : if ( diff.empty() ) {
190 3494 : if ( (now >= next_ack_time) ) {
191 1498 : send_empty_ack();
192 1498 : mindelay_clock = uint64_t( -1 );
193 : }
194 3494 : if ( (now >= next_send_time) ) {
195 1996 : next_send_time = uint64_t( -1 );
196 1996 : mindelay_clock = uint64_t( -1 );
197 : }
198 3831 : } else if ( (now >= next_send_time) || (now >= next_ack_time) ) {
199 : /* Send diffs or ack */
200 3831 : send_to_receiver( diff );
201 3831 : mindelay_clock = uint64_t( -1 );
202 : }
203 7325 : }
204 :
205 : template <class MyState>
206 1498 : void TransportSender<MyState>::send_empty_ack( void )
207 : {
208 1498 : uint64_t now = timestamp();
209 :
210 1498 : assert( now >= next_ack_time );
211 :
212 1498 : uint64_t new_num = sent_states.back().num + 1;
213 :
214 : /* special case for shutdown sequence */
215 1498 : if ( shutdown_in_progress ) {
216 34 : new_num = uint64_t( -1 );
217 : }
218 :
219 : // sent_states.push_back( TimestampedState<MyState>( sent_states.back().timestamp, new_num, current_state ) );
220 1498 : add_sent_state( now, new_num, current_state );
221 1498 : send_in_fragments( "", new_num );
222 :
223 1498 : next_ack_time = now + ACK_INTERVAL;
224 1498 : next_send_time = uint64_t(-1);
225 1498 : }
226 :
227 : template <class MyState>
228 5157 : void TransportSender<MyState>::add_sent_state( uint64_t the_timestamp, uint64_t num, MyState &state )
229 : {
230 5157 : sent_states.push_back( TimestampedState<MyState>( the_timestamp, num, state ) );
231 5157 : if ( sent_states.size() > 32 ) { /* limit on state queue */
232 : typename sent_states_type::iterator last = sent_states.end();
233 0 : for ( int i = 0; i < 16; i++ ) { last--; }
234 0 : sent_states.erase( last ); /* erase state from middle of queue */
235 : }
236 5157 : }
237 :
238 : template <class MyState>
239 3831 : void TransportSender<MyState>::send_to_receiver( const string & diff )
240 : {
241 : uint64_t new_num;
242 3831 : if ( current_state == sent_states.back().state ) { /* previously sent */
243 173 : new_num = sent_states.back().num;
244 : } else { /* new state */
245 3658 : new_num = sent_states.back().num + 1;
246 : }
247 :
248 : /* special case for shutdown sequence */
249 3831 : if ( shutdown_in_progress ) {
250 614 : new_num = uint64_t( -1 );
251 : }
252 :
253 3831 : if ( new_num == sent_states.back().num ) {
254 172 : sent_states.back().timestamp = timestamp();
255 : } else {
256 3659 : add_sent_state( timestamp(), new_num, current_state );
257 : }
258 :
259 3831 : send_in_fragments( diff, new_num ); // Can throw NetworkException
260 :
261 : /* successfully sent, probably */
262 : /* ("probably" because the FIRST size-exceeded datagram doesn't get an error) */
263 3831 : assumed_receiver_state = sent_states.end();
264 3831 : assumed_receiver_state--;
265 3831 : next_ack_time = timestamp() + ACK_INTERVAL;
266 3831 : next_send_time = uint64_t(-1);
267 3831 : }
268 :
269 : template <class MyState>
270 65364 : void TransportSender<MyState>::update_assumed_receiver_state( void )
271 : {
272 65364 : uint64_t now = timestamp();
273 :
274 : /* start from what is known and give benefit of the doubt to unacknowledged states
275 : transmitted recently enough ago */
276 65364 : assumed_receiver_state = sent_states.begin();
277 :
278 65364 : typename list< TimestampedState<MyState> >::iterator i = sent_states.begin();
279 185351 : i++;
280 :
281 185351 : while ( i != sent_states.end() ) {
282 120159 : assert( now >= i->timestamp );
283 :
284 120159 : if ( uint64_t(now - i->timestamp) < connection->timeout() + ACK_DELAY ) {
285 119987 : assumed_receiver_state = i;
286 : } else {
287 65364 : return;
288 : }
289 :
290 185351 : i++;
291 : }
292 : }
293 :
294 : template <class MyState>
295 65364 : void TransportSender<MyState>::rationalize_states( void )
296 : {
297 65364 : const MyState * known_receiver_state = &sent_states.front().state;
298 :
299 11728 : current_state.subtract( known_receiver_state );
300 :
301 79678 : for ( typename list< TimestampedState<MyState> >::reverse_iterator i = sent_states.rbegin();
302 79678 : i != sent_states.rend();
303 : i++ ) {
304 14314 : i->state.subtract( known_receiver_state );
305 : }
306 11728 : }
307 :
308 : template <class MyState>
309 5329 : const string TransportSender<MyState>::make_chaff( void )
310 : {
311 5329 : const size_t CHAFF_MAX = 16;
312 5329 : const size_t chaff_len = prng.uint8() % (CHAFF_MAX + 1);
313 :
314 : char chaff[ CHAFF_MAX ];
315 5329 : prng.fill( chaff, chaff_len );
316 5329 : return string( chaff, chaff_len );
317 : }
318 :
319 : template <class MyState>
320 5329 : void TransportSender<MyState>::send_in_fragments( const string & diff, uint64_t new_num )
321 : {
322 5329 : Instruction inst;
323 :
324 5329 : inst.set_protocol_version( MOSH_PROTOCOL_VERSION );
325 5329 : inst.set_old_num( assumed_receiver_state->num );
326 5329 : inst.set_new_num( new_num );
327 5329 : inst.set_ack_num( ack_num );
328 5329 : inst.set_throwaway_num( sent_states.front().num );
329 5329 : inst.set_diff( diff );
330 10658 : inst.set_chaff( make_chaff() );
331 :
332 5329 : if ( new_num == uint64_t(-1) ) {
333 648 : shutdown_tries++;
334 : }
335 :
336 5329 : vector<Fragment> fragments = fragmenter.make_fragments( inst, connection->get_MTU()
337 : - Network::Connection::ADDED_BYTES
338 5329 : - Crypto::Session::ADDED_BYTES );
339 5329 : for ( vector<Fragment>::iterator i = fragments.begin();
340 10662 : i != fragments.end();
341 10662 : i++ ) {
342 5333 : connection->send( i->tostring() );
343 :
344 5333 : if ( verbose ) {
345 8718 : fprintf( stderr, "[%u] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d, srtt=%.1f\n",
346 3385 : (unsigned int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)i->id, (int)i->fragment_num,
347 3385 : (int)inst.ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(),
348 3385 : 1000.0 / (double)send_interval(),
349 3385 : (int)connection->timeout(), connection->get_SRTT() );
350 : }
351 :
352 : }
353 :
354 5329 : pending_data_ack = false;
355 5329 : }
356 :
357 : template <class MyState>
358 5281 : void TransportSender<MyState>::process_acknowledgment_through( uint64_t ack_num )
359 : {
360 : /* Ignore ack if we have culled the state it's acknowledging */
361 :
362 5281 : typename sent_states_type::iterator i;
363 9942 : for ( i = sent_states.begin(); i != sent_states.end(); i++ ) {
364 9942 : if ( i->num == ack_num ) {
365 : break;
366 : }
367 : }
368 :
369 5281 : if ( i != sent_states.end() ) {
370 15548 : for ( i = sent_states.begin(); i != sent_states.end(); ) {
371 10267 : typename sent_states_type::iterator i_next = i;
372 10267 : i_next++;
373 10267 : if ( i->num < ack_num ) {
374 4661 : sent_states.erase( i );
375 : }
376 : i = i_next;
377 : }
378 : }
379 5281 : assert( !sent_states.empty() );
380 5281 : }
381 :
382 : /* give up on getting acknowledgement for shutdown */
383 : template <class MyState>
384 1025 : bool TransportSender<MyState>::shutdown_ack_timed_out( void ) const
385 : {
386 1025 : if ( shutdown_in_progress ) {
387 1025 : if ( shutdown_tries >= SHUTDOWN_RETRIES ) {
388 : return true;
389 1023 : } else if ( timestamp() - shutdown_start >= uint64_t( ACTIVE_RETRY_TIMEOUT ) ) {
390 0 : return true;
391 : }
392 : }
393 :
394 : return false;
395 : }
396 :
397 : /* Executed upon entry to new receiver state */
398 : template <class MyState>
399 5109 : void TransportSender<MyState>::set_ack_num( uint64_t s_ack_num )
400 : {
401 5109 : ack_num = s_ack_num;
402 : }
403 :
404 : /* Investigate diff against known receiver state instead */
405 : /* Mutates proposed_diff */
406 : template <class MyState>
407 7325 : void TransportSender<MyState>::attempt_prospective_resend_optimization( string &proposed_diff )
408 : {
409 7325 : if ( assumed_receiver_state == sent_states.begin() ) {
410 5222 : return;
411 : }
412 :
413 2103 : string resend_diff = current_state.diff_from( sent_states.front().state );
414 :
415 : /* We do a prophylactic resend if it would make the diff shorter,
416 : or if it would lengthen it by no more than 100 bytes and still be
417 : less than 1000 bytes. */
418 :
419 2103 : if ( (resend_diff.size() <= proposed_diff.size())
420 2103 : || ( (resend_diff.size() < 1000)
421 634 : && (resend_diff.size() - proposed_diff.size() < 100) ) ) {
422 2055 : assumed_receiver_state = sent_states.begin();
423 2103 : proposed_diff = resend_diff;
424 : }
425 2103 : }
426 :
427 : #endif
|