import numpy,operator # compute hamming distance of two bit sequences def hamming(s1,s2): return sum(map(operator.xor,s1,s2)) # xor together all the bits in an integer def xorbits(n): result = 0 while n > 0: result ^= (n & 1) n >>= 1 return result def expected_parity(from_state,to_state,k,glist): # x[n] comes from to_state # x[n-1] ... x[n-k-1] comes from from_state x = ((to_state >> (k-2)) << (k-1)) + from_state return [xorbits(g & x) for g in glist] def convolutional_encoder(bits,k,glist): result = [0] * (len(bits)*len(glist)) state = 0 index = 0 for b in bits: state = (b << (k-1)) + (state >> 1) for g in glist: result[index] = xorbits(state & g) index += 1 return numpy.array(result,dtype=numpy.float) def ber(xmit,received): return numpy.sum((xmit != received) * 1)/float(len(xmit)) cksum_tests = ( ((0xD4,0x02),10,28049623), ((54, 45, 0, 117, 27, 184, 168, 109, 148, 178),80,283706375), ((54, 219, 17, 17, 67, 221, 99, 4, 68, 87, 96, 54, 219, 17, 17, 67, 221, 99, 4, 68, 87, 96, 54, 219, 17, 0, 67, 221, 99, 1, 68, 87, 96, 132, 200, 174, 107, 167, 121, 39, 224, 168, 81, 91),352,1570574572), ((219, 60, 116, 50, 91, 219, 238, 171, 106, 188, 27, 236, 14, 0, 147, 94, 21, 127, 253, 72, 23, 29, 138, 52, 145, 161, 148, 120, 24, 223, 16, 132, 33, 94, 202, 190, 122, 129, 208, 26, 46, 146, 179, 63, 11, 253, 217, 13, 207, 52, 128, 212, 84, 45, 116, 68, 160, 52, 151, 97, 189, 157, 67, 95, 121, 176, 171, 171, 54, 86, 4, 155, 89, 50, 53, 226, 176, 119, 199, 106, 177, 145, 139, 133, 73, 62, 169, 126, 21, 94, 235, 148, 91, 53, 118, 253, 29, 97, 68, 177, 41, 98, 250, 150, 9, 54, 53, 152, 34, 193, 14, 214, 135, 196, 140, 157, 179, 1, 2, 182, 231, 55, 166, 15, 105, 39, 76, 106, 132, 39, 159, 63, 54, 7, 23, 111, 164, 38, 101, 222, 224, 77, 221, 80, 150, 65, 252, 191, 13, 191, 67, 66, 250, 2, 208, 244, 217, 228, 245, 166, 51, 92, 188, 157, 191, 107, 20, 82, 38, 236, 69, 217, 162, 81, 214, 57, 206, 190, 84, 234, 46, 183, 46, 2, 105, 60, 95, 6, 202, 97, 8, 165, 175, 5, 137, 203, 19, 86, 100, 209, 129, 177, 184, 163, 154, 188, 107, 101, 2, 235, 12, 31, 221, 93, 10, 62, 129, 231, 129, 197, 202, 86, 242, 220, 165, 14, 147, 147, 45, 0, 10, 128, 172, 47, 81, 147, 1, 6, 94, 46, 5, 52, 60, 31, 181, 60, 121, 82, 220, 61, 44, 251, 137, 40, 35, 49, 41, 104, 243, 26, 95, 222, 174, 152, 25, 130, 135, 69, 117, 6, 171, 4, 126, 142, 189, 26, 81, 113, 200, 108, 69, 62, 191, 66, 89, 255, 83, 219, 70, 48, 127, 2, 184, 177, 123, 2, 98, 116, 46, 68, 242, 92, 39, 55, 253, 127, 211, 162, 17, 210, 78, 43, 244, 10, 86, 237, 39, 246, 210, 215, 194, 156, 74, 99, 97, 115, 173, 22, 70, 202, 101, 162, 123, 208, 25, 59, 254, 45, 190, 98, 120, 181, 242, 4, 29, 172, 213, 219, 141, 123, 141, 105, 174, 201, 239, 108, 212, 162, 248, 152, 23, 216, 122, 76, 80, 224, 67, 143, 79, 99, 89, 90, 107, 109, 224, 182, 93, 137, 105, 154, 42, 15, 172, 58, 12, 229, 220, 200, 244, 101, 19, 203, 140, 194, 106, 100, 133, 18, 185, 206, 222, 167, 197, 202, 28, 244, 89, 73, 56, 167, 91, 15, 23, 158, 121, 142, 30, 133, 207, 28, 248, 43, 115, 190, 65, 129, 142, 37, 180, 159, 125, 29, 11, 182, 122, 125, 12, 200, 6, 95, 62, 161, 164, 71, 142, 176, 60, 182, 42, 146, 97, 131, 110, 161, 215, 138, 95, 98, 28, 110, 66, 192, 94, 180, 59, 55, 197, 199, 51, 253, 212, 218, 198, 139, 145, 53, 120, 39, 107, 185, 208, 27, 33, 245, 228, 60, 176, 235, 78, 83, 43, 103, 149, 150, 137, 22, 30, 171, 4, 126, 142, 189, 26, 81, 113, 200, 108, 69, 62, 191, 66, 89, 255, 83, 219, 70, 48, 127, 2, 184, 177, 123, 2, 98, 116, 46, 68, 242, 92, 39, 55, 253, 127, 211, 162, 17, 210, 78, 43, 244, 10, 86, 237, 39, 246, 210, 215, 194, 156, 74, 99, 97, 115, 173, 22, 70, 202, 101, 162, 123, 208, 25, 59, 254, 45, 190, 98, 120, 181, 242, 4, 29, 172, 213, 219, 141, 123, 141, 105, 174, 201, 239, 108, 212, 162, 248, 152, 23, 216, 122, 76, 80, 224, 67, 143, 79, 99, 89, 90, 107, 109, 224, 182, 93, 137, 105, 154, 42, 15, 172, 58, 12, 229, 220, 200, 244, 101, 19, 203, 140, 194, 106, 100, 133, 18, 185, 206, 222, 167, 197, 202, 28, 244, 89, 73, 56, 167, 91, 15, 23, 158, 121, 142, 30, 133, 207, 28, 248, 43, 115, 190, 65, 129, 142, 37, 180, 159, 125, 29, 11, 182, 122, 125, 12, 200, 6, 95, 62, 161, 164, 71, 142, 176, 60, 182, 42, 146, 97, 131, 110, 161, 215, 138, 95, 98, 28, 110, 66, 192, 94, 180, 59, 55, 197, 199, 51, 253, 212, 218, 198, 139, 145, 53, 120, 39, 107, 185, 208, 27, 33, 245, 228, 60, 176, 235, 78, 83, 43, 103, 149, 150, 137, 22, 30, 179, 151, 253, 223),5000,2947950988) ) def verify_checksum(checksum): for test,len,expected in cksum_tests: bits = numpy.concatenate([int_to_bits(b,8) for b in test]) result = checksum(bits[:len]) if result != expected: print "**** Checksum test failed ****" print "expected:",expected," got:",result print "bit sequence:" print bits return False return True # 2 points for Task 1 def verify_task1(checksum): if verify_checksum(checksum): return 2 else: print "Verify of task 1 failed" return None # 3 points for Task 2 def verify_task2(rect_decode, decode_stream): if test_rect_decode(rect_decode,verbose=False): if test_stream_decode(decode_stream,verbose=False): return 3 else: print "Verify of task 2 failed" return None # 4 points for Task 3 def verify_task3(decoder,debug=False): k = 3 glist = (7,6) d = decoder(k,glist) # message with no errors message = numpy.random.random_integers(0,1,10) sent = convolutional_encoder(message,k,glist) received_message = d.decode(sent,debug=debug) if numpy.any(message != received_message): print "Decoder testing failed (no transmission errors)..." print "Received parity bits:",sent print "Decoded message returned by your decoder: ",received_message print "Expected:",message return None # message with two transmission errors sent[3] = 1 - sent[3] sent[12] = 1 - sent[12] received_message = d.decode(sent,debug=debug) if numpy.any(message != received_message): print "Decoder testing failed (2 transmission errors)..." print "Received parity bits:",sent print "Decoded message returned by your decoder: ",received_message print "Expected:",message return None return 4 # 1 point for Task 4 def verify_task4(decoder): k = 3 glist = (7,6) d = decoder(k,glist) for i in xrange(10): expected = numpy.random.random_integers(0,1,len(glist)) received = numpy.random.rand(len(glist)) dist = d.branch_metric(expected,received) expected_dist = sum([(expected[j] - received[j])**2 for j in xrange(len(glist))]) if numpy.any((dist - expected_dist) > 1e-5): print "soft branch_metric failed..." print "expected voltages:",expected print "received voltages:",received print "value returned by branch_metric:",dist print "expected return value:",expected_dist return None return 1 # compute even parity for a binary sequence (a list of 0's and 1's). # returns 0 if the number of 1's in data is even, else 1 def even_parity(data): return sum(data) % 2 # construct a codeword with row and column parity def codeword(data,nrows,ncols): ndata = len(data) assert ndata == nrows*ncols,"codeword: data must have nrows*ncols bits" result = numpy.zeros(ndata + nrows + ncols,dtype=int) result[:ndata] = data # compute row parity bits for r in xrange(nrows): result[ndata + r] = even_parity(data[r*ncols:(r+1)*ncols]) # compute column parity bits for c in xrange(ncols): result[ndata + nrows + c] = even_parity(data[c:len(data):ncols]) return result # run decoder function though its paces def test_rect_decode(decoder,verbose=True): # test the decoder on a particular value def run_test(test,expected,msg,nrows,ncols): result = decoder(numpy.array(test),nrows,ncols) if numpy.any(result != expected): print "Error detected while testing",msg,"in a %d by %d block" % (nrows,ncols) print "Test code word:",test print "Expected:",expected print "Received:",result return True return False # test all possible data values for a rectangular code nrows = 2 ncols = 4 for i in xrange(1 << (nrows*ncols)): data = numpy.array(int_to_bits(i,nrows*ncols)) if run_test(codeword(data,nrows,ncols),data, "valid codewords", nrows,ncols): return False # test all single-bit errors (should be corrected) for a rectangular code nrows = 4 ncols = 7 data = 6172536038 good = codeword(int_to_bits(data,nrows*ncols),nrows,ncols) # now test all possible single errors for i in xrange(nrows*ncols): bad = numpy.array(good) bad[i] = 1 - bad[i] if run_test(bad,good[0:nrows*ncols], "correctable single-bit errors (bit %d)" % i, nrows,ncols): return False if verbose: print "Rect parity decoding of single block tested successfully: congratulations!" return True def test_stream_decode(decoder,verbose=True): nrows = 2 ncols = 2 nbits = 59*nrows*ncols # kind of a random choice (59) message = numpy.random.random_integers(0,1,nbits) cw = [] for i in range(0,nbits,nrows*ncols): cw.extend(codeword(message[i:i+nrows*ncols], nrows, ncols)) result = decoder(numpy.array(cw), nrows, ncols) if numpy.any(result != message): print "Error detected while testing", message print "Decoded wrongly as", result print "The correct codeword is", cw return False nrows = 4 ncols = 3 nbits = 39*nrows*ncols # kind of a random choice (39) message = numpy.random.random_integers(0,1,nbits) cw = [] for i in range(0,nbits,nrows*ncols): cw.extend(codeword(message[i:i+nrows*ncols], nrows, ncols)) result = decoder(cw, nrows, ncols) if numpy.any(result != message): print "Error detected while testing", message print "Decoded wrongly as", result print "The correct codeword is", cw return False if verbose: print "Stream decoding succeeded: congratulations!" return True # convert nbits of int into bit sequences, lsb first def int_to_bits(n,nbits): return [(n >> i) & 1 for i in xrange(nbits)] # convert bit sequence (lsb first) to an int def bits_to_int(bits): result = 0 for i in xrange(len(bits)): result += bits[i] * (1 << i) return int(result) ################################################## ## ## Code to submit task to server. Do not change. ## Task-specific code is in verify(), defined above. ## ################################################## import Tkinter class Dialog(Tkinter.Toplevel): def __init__(self, parent, title = None): Tkinter.Toplevel.__init__(self, parent) self.transient(parent) if title: self.title(title) self.parent = parent body = Tkinter.Frame(self) self.initial_focus = self.body(body) body.pack(padx=5, pady=5) self.buttonbox() self.grab_set() if not self.initial_focus: self.initial_focus = self self.protocol("WM_DELETE_WINDOW", self.cancel) self.geometry("+%d+%d" % (parent.winfo_rootx()+50,parent.winfo_rooty()+50)) self.initial_focus.focus_set() self.wait_window(self) def body(self, master): return None # add standard button box def buttonbox(self): box = Tkinter.Frame(self) w = Tkinter.Button(box, text="Ok", width=10, command=self.ok, default=Tkinter.ACTIVE) w.pack(side=Tkinter.LEFT, padx=5, pady=5) box.pack() # standard button semantics def ok(self, event=None): if not self.validate(): self.initial_focus.focus_set() # put focus back return self.withdraw() self.update_idletasks() self.apply() self.cancel() def cancel(self, event=None): # put focus back to the parent window self.parent.focus_set() self.destroy() # command hooks def validate(self): return 1 # override def apply(self): pass # override # ask user for Athena username and MIT ID class SubmitDialog(Dialog): def __init__(self,parent,error=None,title = None): self.error = error self.athena_name = None self.mit_id = None Dialog.__init__(self,parent,title=title) def body(self, master): row = 0 if self.error: l = Tkinter.Label(master,text=self.error, anchor=Tkinter.W,justify=Tkinter.LEFT,fg="red") l.grid(row=row,sticky=Tkinter.W,columnspan=2) row += 1 Tkinter.Label(master, text="Athena username:").grid(row=row,sticky=Tkinter.E) self.e1 = Tkinter.Entry(master) self.e1.grid(row=row, column=1) row += 1 Tkinter.Label(master, text="MIT ID:").grid(row=row,sticky=Tkinter.E) self.e2 = Tkinter.Entry(master) self.e2.grid(row=row, column=1) return self.e1 # initial focus # add standard button box def buttonbox(self): box = Tkinter.Frame(self) w = Tkinter.Button(box, text="Submit", width=10, command=self.ok, default=Tkinter.ACTIVE) w.pack(side=Tkinter.LEFT, padx=5, pady=5) w = Tkinter.Button(box, text="Cancel", width=10, command=self.cancel) w.pack(side=Tkinter.LEFT, padx=5, pady=5) box.pack() def apply(self): self.athena_name = self.e1.get() self.mit_id = self.e2.get() # Let user know what server said class MessageDialog(Dialog): def __init__(self, parent,message = '',title = None): self.message = message Dialog.__init__(self,parent,title=title) def body(self, master): l = Tkinter.Label(master, text=self.message,anchor=Tkinter.W,justify=Tkinter.LEFT) l.grid(row=0) # return contents of file as a string def file_contents(fname): # use universal mode to ensure cross-platform consistency in hash f = open(fname,'U') result = f.read() f.close() return result import hashlib def digest(s): m = hashlib.md5() m.update(s) return m.hexdigest() # if verify(f) indicates points have been earned, submit results # to server if requested to do so import inspect,os,urllib,urllib2 def checkoff(f,g=None,task='???',submit=True,debug=False): if task == 'L4_1': points = verify_task1(f) elif task == 'L4_2': points = verify_task2(f,g) elif task == 'L4_3': points = verify_task3(f,debug=debug) elif task == 'L4_4': points = verify_task4(f) else: raise ValueError,"task must be one of L4_1, L4_2, L4_3, or L4_4" if submit and points: root = Tkinter.Tk(); #root.withdraw() error = None while submit: sd = SubmitDialog(root,error=error,title="Submit Task %s?"%task) if sd.athena_name: if isinstance(f,str): fname = os.path.abspath(f) else: fname = os.path.abspath(inspect.getsourcefile(f)) post = { 'user': sd.athena_name, 'id': sd.mit_id, 'task': task, 'digest': digest(file_contents(os.path.abspath(inspect.getsourcefile(checkoff)))), 'points': points, 'filename': fname, 'file': file_contents(fname) } try: response = urllib2.urlopen('http://scripts.mit.edu/~6.02/currentsemester/submit_task.cgi', urllib.urlencode(post)).read() except Exception,e: response = 'Error\n'+str(e) if response.startswith('Error\n'): error = response[6:] else: MessageDialog(root,message=response,title='Submission response') break else: break root.destroy()