#!/usr/bin/perl -w #Random seed #$rand =10; #$standardmode =3; $redoMode = 1; $undoMode = 2; $mode = 1; $COMMITING =1; #PENDING $ENDING =2;#COMMITED $DONE =3;#DONE $START =1; $UPDATE =2; $ENDED =3; $OUTCOME =4; $CHECKPOINT =5; %tid_stat =(); #already assigned tids %current_db = (); #the in-memory database %on_disk_db = (); #the on-disk DB %winners = (); %losers = (); %done = (); $EMPTY = " "; sub get_log_type{ if($_[0] eq "START"){ return $START; }elsif($_[0] eq "UPDATE"){ return $UPDATE; }elsif($_[0] eq "END"){ return $ENDED; }elsif($_[0] eq "OUTCOME"){ return $OUTCOME; } elsif($_[0] eq "CHECKPOINT"){ return $CHECKPOINT; } else { return 0; } } sub get_tid_stat{ my($tid) = $_[0]; if (exists $tid_stat{$tid}){ if($tid_stat{$tid} eq $COMMITING){ return "PENDING(WAITING COMMIT/ABORT)"; } elsif ($tid_stat{$tid} eq $ENDING){ return "COMMITING (WAITING END)"; } else { return "DONE"; } } else { return "NOT INITIALIZED" } } #check if this command is valid in current context sub is_valid_command{ my ($tid, $type, $anum, $value, $oldvalue) = @_; if( $type eq "new"){ #new -> tid not in tid list if (exists $tid_stat{$tid}){ print "NEW tid: $tid already exists\n"; return 0; } else { return 1; } }elsif ($type eq "init"){ #init -> tid status == $COMMITING if ( (exists $tid_stat{$tid}) && ($tid_stat{$tid} == $COMMITING) ){ #complain if the value is in the database if (exists $current_db{$anum}){ print "Cannot initialize $anum to $value - already exists with value $current_db{$anum}\n"; return 0; } else { return 1; } } else { print "INIT tid: $tid is in ".&get_tid_stat($tid)." mode\n"; return 0; } }elsif ( ($type eq "write")||($type eq "credit")||($type eq "debit") ){ #write -> tid status == $COMMITING if ( (exists $tid_stat{$tid}) && ($tid_stat{$tid} == $COMMITING) ){ #complain if the value is not in the database if (exists $current_db{$anum}){ return 1; } else { print "Cannot update $anum - not initialized"; return 0; } } else { print "WRITE/DEBIT/CREDIT tid: $tid is in ".&get_tid_stat($tid)." mode\n"; return 0; } }elsif ($type eq "commit"){ #commit -> tid status == $COMMITING if ( (exists $tid_stat{$tid}) && ($tid_stat{$tid} == $COMMITING) ){ return 1; } else { print "COMMIT tid: $tid is in ".&get_tid_stat($tid)." mode\n"; return 0; } }elsif ($type eq "abort"){ #abort -> tid status == $COMMITING if ( (exists $tid_stat{$tid}) && ($tid_stat{$tid} == $COMMITING) ){ return 1; } else { print "ABORT tid: $tid is in ".&get_tid_stat($tid)." mode\n"; return 0; } }elsif ($type eq "end"){ #end -> tid status == $ENDING if ( (exists $tid_stat{$tid}) && ($tid_stat{$tid} == $ENDING) ){ return 1; } else { print "END tid: $tid is in ".&get_tid_stat($tid)." mode\n"; return 0; } } else { #all the other recognized commands are valid return 1; } } sub do_command { my ($tid, $type, $anum, $value, $oldvalue) = @_; if ($type eq "begin") { &handle_begin($tid, $type, $anum, $value, $oldvalue); } elsif ($type eq "init") { &handle_init($tid, $type, $anum, $value, $oldvalue); } elsif ($type eq "write") { &handle_write($tid, $type, $anum, $value, $oldvalue); } elsif ($type eq "debit") { &handle_debit($tid, $type, $anum, $value, $oldvalue); } elsif ($type eq "credit") { &handle_credit($tid, $type, $anum, $value, $oldvalue); } elsif ($type eq "commit") { &handle_commit($tid, $type, $anum, $value, $oldvalue); } elsif ($type eq "end") { &handle_end($tid, $type, $anum, $value, $oldvalue); } elsif ($type eq "abort") { &handle_abort($tid, $type, $anum, $value, $oldvalue); } else { print "Command $type not yet implemented \n"; } } sub handle_begin{ my ($tid, $type, $anum, $value, $oldvalue) = @_; $tid_stat{$tid} = $COMMITING; #write a START record to log print "Beginning id: $tid\n"; print LOG "type: START action_id: $tid\n"; } sub handle_init{ my ($tid, $type, $anum, $value, $oldvalue) = @_; #wrie a UPDATE record with old value = NULL print "Initializing account $anum to $value\n"; print LOG "type: UPDATE action_id: $tid variable: $anum redo: \"$value\" undo: NULL\n"; #set the value in db $current_db{$anum} = $value; &write_to_disk($tid, $anum, $value); } sub handle_write{ #later change this to (DEBIT/CREDIT) !!! my ($tid, $type, $anum, $value, $oldvalue) = @_; #write a UPDATE record $oldvalue = $current_db{$anum}; print "Setting account $anum to $value\n"; print LOG "type: UPDATE action_id: $tid variable: $anum redo: \"$value\" undo: \"$oldvalue\"\n"; #update the ondisk cache $current_db{$anum} = $value; &write_to_disk($tid, $anum, $value, 0); } sub handle_debit{ my ($tid, $type, $anum, $value, $oldvalue) = @_; #write a UPDATE record $oldvalue = $current_db{$anum}; print "Debiting account $anum by $value\n"; $value = $oldvalue-$value; print LOG "type: UPDATE action_id: $tid variable: $anum redo: \"$value\" undo: \"$oldvalue\"\n"; #update the ondisk cache $current_db{$anum} = $value; &write_to_disk($tid, $anum, $value, 0); } sub handle_credit{ my ($tid, $type, $anum, $value, $oldvalue) = @_; #write a UPDATE record $oldvalue = $current_db{$anum}; print "Crediting account $anum by $value\n"; $value += $oldvalue; print LOG "type: UPDATE action_id: $tid variable: $anum redo: \"$value\" undo: \"$oldvalue\"\n"; #update the ondisk cache $current_db{$anum} = $value; &write_to_disk($tid, $anum, $value, 0); } sub handle_commit{ my ($tid, $type, $anum, $value, $oldvalue) = @_; #write a OUTCOME record with COMMITTED print "Committing id: $tid\n"; print LOG "type: OUTCOME action_id: $tid status: COMMITTED\n"; $tid_stat{$tid} = $ENDING; } sub handle_abort{ my ($tid, $type, $anum, $value, $oldvalue) = @_; #undo all the actions of this tid print "Aborting id: $tid\n"; &undo_tid($tid); #write a OUTCOME record with ABORTED print LOG "type: OUTCOME action_id: $tid status: ABORTED\n"; $tid_stat{$tid} = $ENDING; } sub handle_end{ my ($tid, $type, $anum, $value, $oldvalue) = @_; #flush all the records to disk print "Ending id: $tid\n"; &flush_updates($tid); #write an END record print LOG "type: END action_id: $tid\n"; $tid_stat{$tid} = $DONE; } sub write_to_disk{ my (@write_command) = @_; #write command ($tid, $anum, $value, $delete) #if( (($mode == $standardmode) && &random_bit()) || if($mode == $undoMode){ &forced_write_to_disk(@write_command); } else { #add to the pending writes push(@pending_writes,\@write_command); } } sub forced_write_to_disk{ my ($tid, $anum, $value, $delete) = @_; if($delete){ delete($on_disk_db{$anum}); } else { $on_disk_db{$anum} = $value; } } sub flush_updates{ my($tid) = $_[0]; #go through the pending_writes (start to end) foreach $write (@pending_writes){ @write_command = @{$write}; #look for those matching the $tid if($tid == $write_command[0]){ #write them to disk &forced_write_to_disk(@write_command); } } } #undo a tid #read the whole log (from start) and find all the writes corresponding to this tid and add them to a table #starting from the last one do all the undos to the in_memory_db and use write_to disk sub undo_tid{ my($tid) = $_[0]; my(@updates) = (); #reset the LOG to start close(LOG); open(LOG,"){ @fields = split(/\s+/,$line); if(($fields[3] == $tid) && ($fields[1] eq "UPDATE")){ #($tid, $anum, $value, $delete) #create the write command if($fields[9] =~ /\"(\S+)\"/){ #do not delete &push_to_array(\@updates, $tid, $fields[5], $1 , 0); } else { #delete &push_to_array(\@updates, $tid, $fields[5], 0 , 1); } } } close(LOG); open(LOG,">>LOG") or die "Could not open LOG for writing\n"; #undo the actions while(@updates){ $array = pop(@updates); ($tid, $anum, $value, $delete)= @{$array}; #update the in-memory db if($delete){ #a delete delete($current_db{$anum}); } else { $current_db{$anum} = $value; } &write_to_disk($tid, $anum, $value, $delete); } } sub push_to_array{ my($array, @params)=@_; push(@{$array},\@params); } sub process_command_input { # Read commands from STDIN and process them. my ($line) = (); my @command = (); open(LOG, ">>LOG") or die "Could not open LOG for writing\n"; print "> "; while ($line = ) { @command = (); if ($line =~ /begin\s+(\d+)\s*$/) { @command = ($1, "begin", 0,0,0); } elsif ($line =~ /create_account\s+(\d+)\s+(\S+)\s+(\d+)\s*$/) { @command = ($1, "init", $2,$3,0); } elsif ($line =~ /write\s+(\d+)\s+(\S+)\s+(\d+)\s*$/) { @command = ($1, "write", $2,$3,0); } elsif ($line =~ /debit_account\s+(\d+)\s+(\S+)\s+(\d+)\s*$/) { @command = ($1, "debit", $2,$3,0); } elsif ($line =~ /credit_account\s+(\d+)\s+(\S+)\s+(\d+)\s*$/) { @command = ($1, "credit", $2,$3,0); } elsif ($line =~ /commit\s+(\d+)\s*$/) { @command = ($1, "commit", 0,0,0); } elsif ($line =~ /abort\s+(\d+)\s*$/) { @command = ($1, "abort", 0,0,0); } elsif ($line =~ /end\s+(\d+)\s*$/) { @command = ($1, "end", 0,0,0); } elsif ($line =~ /show_state/) { &print_db(); #next; } elsif ($line =~ /crash/) { &do_crash(); } elsif ($line =~ /checkpoint/) { &do_checkpoint(); #next; } else { print "invalid command: $line"; #next; } if (scalar(@command)) { if(is_valid_command(@command)){ &do_command(@command); #print "A valid command\n"; } else { print "An invalid command\n"; } } print "> "; } &do_crash(); } sub undo_update{ my($name,$new_val,$old_val) = @_; if($old_val =~ /\"(\S+)\"/){ #do not delete #print "Undoing $name from $new_val to $1\n"; $on_disk_db{$name} = $1; } else { #delete #print "Deleting $name\n"; delete($on_disk_db{$name}); } } sub do_crash{ my($key); print "crashing ...\n"; close LOG; #dump the DB to disk open(DB,">DB") or die "Could not open DB for writing\n"; foreach $key ( keys(%on_disk_db) ){ print DB "$key $on_disk_db{$key}\n"; } close(DB); exit(0); } sub recover{ my($key,$val); #read the log printf "Recovering the database ..\n"; open(LOG,"){ @fields = split(/\s+/,$line); &push_to_array(\@recovery_log, @fields); } close(LOG); #read the current DB (to current_db); open(DB,"){ ($key,$val) = split(/\s+/,$line); $on_disk_db{$key} = $val; #print "$key $val\n"; } close(DB); &roll_back(); if($mode != $undoMode){ &forward_scan(); } else { #for the losers add an OUTCOME record print " Logging STATUS records for losers\n"; open(LOG,">>LOG") or die "Could not open LOG for writing\n"; foreach $id (keys(%losers)){ print LOG "type: OUTCOME action_id: $id status: ABORTED\n"; } close(LOG); } #copy the ondisk DB to memory %current_db = %on_disk_db; print "Recovery done\n"; } sub forward_scan{ #we start the forward scan from the beginning of the log, in practice #we would start from last point of the roll back my($entry_l,@entry) = (); my($id,$i); print "Staring forward scan ...\n"; foreach $entry_l (@recovery_log){ @entry = @{$entry_l}; $type = get_log_type($entry[1]); if($type ==0){ next; } if( ($type == $UPDATE) && #an update exists($winners{$entry[3]}) && #a winner ($winners{$entry[3]} eq "COMMITTED") ){ #committed if($entry[7] =~ /\"(\S+)\"/){ $on_disk_db{$entry[5]} = $1; print " REDOING: "; for($i =0; $i < 10; $i++){ print "$entry[$i] "; } print "\n"; } } } #for the winners -> log END open(LOG, ">>LOG") or die "Could not open LOG for writing"; print " Logging END records for winners\n"; foreach $id (keys(%winners)){ #add a done record aswell print LOG "type: END action_id: $id\n"; } close(LOG); print "Forward scan done\n"; } sub roll_back{ #for each record my($entry_l,@entry, $i,$line_c) = (); my($in_done, $in_winners, $in_losers) = (); my($done_cp,$tid,$type); my(%pending_winners) =(); my(%pending_losers) =(); my($key,@a,@b,@c) = (); my(@templog) = reverse(@recovery_log); $done_cp =0; %winners=(); %losers=(); %done=(); $line_c =0; print "Starting rollback ...\n"; foreach $entry_l (@templog){ @entry = @{$entry_l}; $type = &get_log_type($entry[1]); if($type ==0){ next; } $line_c++; if($type != $CHECKPOINT){ $tid = scalar($entry[3]); } if($type == $CHECKPOINT){ #process the checkpoint for($i =3 ; $i< @entry; $i++){ if($entry[$i] eq "COMMITTED:"){ $i++; last; } #found a loser -> if this is not in winners or done, add to losers if($entry[$i] eq "id:"){ next; } if( !exists($winners{$entry[$i]}) && !exists($done{$entry[$i]}) ){ $losers{$entry[$i]} = $EMPTY; $pending_losers{$entry[$i]} = $EMPTY; } #printf "Loser $entry[$i]\n"; } for( ; $i< @entry; $i++){ if($entry[$i] eq "DONE:"){ $i++; last; } if($entry[$i] eq "id:"){ next; } #found a winner -> if this is not in done, add to winner if( !exists($done{$entry[$i]}) ){ $winners{$entry[$i]} = $EMPTY; $pending_winners{$entry[$i]} = $EMPTY; } #printf "Winner $entry[$i]\n"; } for( ; $i< @entry; $i++){ #found done -> add to done #printf "Done $entry[$i]\n"; if($entry[$i] eq "id:"){ next; } $done{$entry[$i]} = $DONE; } $done_cp = 1; }elsif ($done_cp){ # when pending_winners and pending_losers are empty - we are done if( ((scalar(keys %pending_winners)==0) && ($mode == $redoMode )) || ((scalar(keys %pending_losers )==0) && ($mode == $undoMode )) ){ $line_c-- ; #pending winners/losers became empty on the previous line last; } #Get status of the winners that were at the checkpoint if( ($type ==$OUTCOME) && exists($winners{$tid}) ){ #printf "Winner status: $entry[5]\n"; $winners{$tid} = $entry[5]; } if($type ==$START){ if( exists($pending_winners{$tid}) ){ delete($pending_winners{$tid}); } if( exists($pending_losers{$tid}) ){ #remove from pending_losers delete($pending_losers{$tid}); } } elsif ($type == $UPDATE){ if( exists($losers{$tid}) ){ if($mode != $redoMode){ print " UNDOING: "; for($i =0; $i < 10; $i++){ print "$entry[$i] "; } print "\n"; &undo_update($entry[5],$entry[7],$entry[9]); } } } } else { #haven't found a checkpoint yet if($type == $ENDED){ #done <- done + log_record $done{$tid} = $EMPTY; } if(($type == $OUTCOME) && !exists($done{$tid})){ #winners <- winners + log_record #pending_winners <- pending_winners + action_id; $winners{$tid} = $entry[5] ; #note down the status of the winner $pending_winners{$tid} = $EMPTY; } if( !exists($winners{$tid}) && !exists($done{$tid}) ){ if( !exists($losers{$tid}) ){ #losers <- losers +actionID #pending_losers <- pending_losers + action_id $losers{$tid} = $EMPTY; $pending_losers{$tid} = $EMPTY; } if($type == $UPDATE){ if($mode != $redoMode){ print " UNDOING: "; for($i =0; $i < 10; $i++){ print "$entry[$i] "; } print "\n"; &undo_update($entry[5],$entry[7],$entry[9]); } } } if($type == $START){ if(exists($pending_losers{$tid})){ #remove action_id from pending_losers delete($pending_losers{$tid}); } if(exists($pending_winners{$tid})){ #remove action_id from pending_winners delete($pending_winners{$tid}); } } } } print " The log was rolled back $line_c lines\n"; #display the winners losers and dones print "Rollback done\n"; print "Winners: "; foreach $key (keys(%winners)){ print "id: $key "; } print "Losers: "; foreach $key (keys(%losers)){ print "id: $key "; } print "Done: "; foreach $key (keys(%done)){ print "id: $key "; } print "\n"; } #sub random_bit{ # $rand = ( 21*$rand + 3)%100; # if($rand<50){ # return 0; # } else { # return 1; # } #} sub do_checkpoint{ #WINNERS -> COMMITTED (ENDING) #LOSERS -> PENDING (COMMITTING) my(@winners) = (); my(@losers) = (); my(@done) = (); my($key, $val) = (); print "Doing checkpoint\n"; foreach $key ( keys(%tid_stat) ){ $val = $tid_stat{$key}; if($val == $COMMITING){ #FIND THE ONES THAT ARE COMMITTING (LOSERS) push( @losers ,$key); } elsif($val == $ENDING){ #FIND THE ONES THAT ARE ENDING (WINNERS); push( @winners, $key); } elsif($val == $DONE){ #FIND THE ONES THAT ARE DONE (DONE); push(@done, $key); } } #log the checkpoint print LOG "type: CHECKPOINT "; print LOG "PENDING: "; foreach $val (@losers){ print LOG "id: $val "; } print LOG "COMMITTED: "; foreach $val (@winners){ print LOG "id: $val "; } print LOG "DONE: "; foreach $val (@done){ print LOG "id: $val "; } print LOG "\n"; } sub usage { print "usage : trans [-reset] [-redo|-undo]\n"; } sub print_db{ &dump_db(); &dump_log(); } sub dump_log{ close(LOG); open(LOG,"){ print $line; } print "-----------------------\n"; close(LOG); open(LOG,">>LOG") or die "Could not open LOG for writing\n"; } sub dump_db{ #dump on_disk_db my($val,$key) = (); print "\n"; print "-----------------------\n"; print "On-disk DB contents:\n"; foreach $key ( keys(%on_disk_db) ){ $val = $on_disk_db{$key}; print "Account: $key Value: $val\n"; } print "-----------------------\n"; } sub main { my @args = @_; my $arg; while ($arg = shift (@args)) { if ($arg =~ /-reset/) { system("rm", "-f", "LOG", "DB"); } elsif ($arg =~ /-redo/) { $mode = $redoMode; } elsif ($arg =~ /-undo/) { $mode = $undoMode; #} elsif ($arg =~ /-standard/) { # $mode = $standardmode } else { &usage(); exit(0); } } if (open(LOG, "LOG")) { close(LOG); &recover(); } else { print "Opening new log\n"; } &process_command_input(); } $SIG{'INT'} = 'do_crash'; &main(@ARGV);