import numpy, os import channel reload(channel) # convert digital sample sequence into voltages; result is numpy array. # samples_per_bit can be a float (eg, 8.1) to model clock # drift in the transmitter. # NOTE JKW; NO DEFAULT VALUES FOR SAMPLES_PER_BIT, causes confusion def bits_to_samples(bits,samples_per_bit, npreamble=0, npostamble=0, v0 = 0.0, v1 = 1.0, repeat = 1): """ generate sequence of voltage samples: bits: binary message sequence npreamble: number of leading v0 samples npostamble: number of trailing v0 samples samples_per_bit: number of samples for each message bit v0: voltage to output for 0 bits v1: voltage to output for 1 bits repeat: how many times to repeat whole shebang """ # Preallocate samples list and default to all v0's samples = [v0]*(npreamble + int(len(bits)*samples_per_bit) + npostamble) index = npreamble for i in xrange(len(bits)): if bits[i] != 0: start = int(index) end = int(index + samples_per_bit) samples[start:end] = [v1]*(end-start) index = index + samples_per_bit samples = samples * repeat return numpy.array(samples) # convert nbits of int into bit sequences, lsb first def int_to_bits(n,nbits=8): return [(n >> i) & 1 for i in xrange(nbits)] # encode list of integers into sequence of bits # prefix = sequence of bits that appear before data bits # postfix = sequence of bits that appear after data bits # nbits = number of data bits to encode (LSB first) def encode_data(data,prefix=[1],postfix=[0],nbits=8): # start with prefix result = prefix[:] # encode data LSB first result.extend(int_to_bits(data,nbits=8)) # end with postfix bits result.extend(postfix) return result # encode numpy array of bits in to an array of encoded bits of # nbits-length words with prefix and postfix # prefix = sequence of bits that appear before data bits # postfix = sequence of bits that appear after data bits # nbits = number of data bits to encode def encode_bits(data,prefix=[1],postfix=[0],nbits=8): assert len(data) % nbits == 0, "Data length incorrect size!" num_words = len(data)/8 pad = 5 word_len = nbits + len(prefix)+len(postfix) enc_bits = numpy.zeros(num_words*word_len + 2 * pad, int) offset = pad for i in xrange(num_words): word = prefix[:] + list(data[nbits*i:nbits*(i+1)]) + postfix[:] enc_bits[offset:offset+word_len] = word offset = offset + word_len return enc_bits # encode string. See encode_data for details... def encode_string(s,prefix=[1],postfix=[0],nbits=8): result = [0]*5 # start with some zeros for ch in s: result.extend(encode_data(ord(ch),prefix=prefix,postfix=postfix,nbits=nbits)) result += [0]*5 # end with some zeros return result # convert bit sequence (lsb first) to an int def bits_to_int(bits): result = 0 for i in xrange(len(bits)): result += bits[i] * (1 << i) return int(result) # Convert a floating point number between zero and one to bits def float_to_bits(flt, nbits=8): assert flt < 1.0+1.0e-12 and flt > 0.0-1.0e-12, "floats out of range" return int_to_bits(int(flt*(2**nbits-1))) # Convert nbits def bits_to_float(bits, nbits=8): return float(bits_to_int(bits)/float(2**nbits-1)) # Convert a floating point array between zero and one to bits def farray_to_bits(farray, nbits=8): assert max(farray) < 1.0+1.0e-12 and min(farray) > 0.0-1.0e-12, "floats out of range" ints = numpy.array(farray*(2**nbits-1), int) bits = numpy.zeros(len(ints)*nbits) index = 0 for i in xrange(len(ints)): bits[index:index+nbits] = int_to_bits(ints[i]) index = index + nbits return bits # Convert bits to a floating point array def bits_to_farray(bits, nbits=8): assert len(bits) % nbits == 0, "bits not a multiple of %d" % nbits scale = float(2**nbits-1) farray = numpy.zeros(len(bits)/nbits,float) index = 0 for i in xrange(len(farray)): farray[i] = float(bits_to_int(bits[index:index+nbits]))/scale index = index + nbits return farray # 8b/10 encoder # two encoder tables: 5b/6b and 3b/4b # each entry is a tuple of ints # the two ints of each entry should be the same, or be complements # RD is -1: use tuple[0]; if tuple[0]!=tuple[1], set RD to +1 # RD is +1: use tuple[1]; if tuple[0]!=tuple[1], set RD to -1 table_5b_6b = [ (0x39,0x06), # D.00 (0x2E,0x11), # D.01 (0x2D,0x12), # D.02 (0x23,0x23), # D.03 (0x2B,0x14), # D.04 (0x25,0x25), # D.05 (0x26,0x26), # D.06 (0x07,0x38), # D.07 (0x27,0x18), # D.08 (0x29,0x29), # D.09 (0x2A,0x2A), # D.10 (0x0B,0x0B), # D.11 (0x2C,0x2C), # D.12 (0x0D,0x0D), # D.13 (0x0E,0x0E), # D.14 (0x3A,0x05), # D.15 (0x36,0x09), # D.16 (0x31,0x31), # D.17 (0x32,0x32), # D.18 (0x13,0x13), # D.19 (0x34,0x34), # D.20 (0x15,0x15), # D.21 (0x16,0x16), # D.22 (0x17,0x28), # D.23 (0x33,0x0C), # D.24 (0x19,0x19), # D.25 (0x1A,0x1A), # D.26 (0x1B,0x24), # D.27 (0x1C,0x1C), # D.28 (0x1D,0x22), # D.29 (0x1E,0x21), # D.30 (0x35,0x0A), # D.31 ] # index by (byte >> 5) & 0x7 # use D.x.A7 when # if RD is -1, and x is 17, 18, 20 # if RD is +1, and x 11, 13, 14 table_3b_4b = [ (0xD,0x2), # D.x.0 (0x9,0x9), # D.x.1 (0xA,0xA), # D.x.2 (0x3,0xC), # D.x.3 (0xB,0x4), # D.x.4 (0x5,0x5), # D.x.5 (0x6,0x6), # D.x.6 (0x7,0x8), # D.x.P7 (0xE,0x1), # D.x.A7 ] # 10-bit sync to use for RD==-1, RD==+1: sync = (0x17C,0x283) # (sync,updated rd) = encode_sync_8b10b(rd) def encode_sync_8b10b(rd): if rd == -1: return (sync[0],1) else: return (sync[1],-1) # (10b,updated rd) = encode_8b_10b(8b,rd) def encode_data_8b10b(ch,rd): # encode low-order 6 bits x = ch & 0x1F x_6b = table_5b_6b[x] if rd == -1: xcode = x_6b[0] if x_6b[0] != x_6b[1]: rd = 1 else: # rd == 1 xcode = x_6b[1] if x_6b[0] != x_6b[1]: rd = -1 # encode high-order 3 bits y = (ch >> 5) & 0x7 y_4b = table_3b_4b[y] if rd == -1: if y == 7 and x in (17,18,20): # use D.x.A7 y_4b = table_3b_4b[8] ycode = y_4b[0] if y_4b[0] != y_4b[0]: rd = 1 else: # rd == 1 if y == 7 and x in (11,13,14): # use D.x.A7 y_4b = table_3b_4b[8] ycode = y_4b[1] if y_4b[0] != y_4b[0]: rd = -1 return((ycode << 6) + xcode,rd) # encode string with 8b/10b inserting syncs every so often def encode_string_8b10b(s,sync_interval = 16): result = [0,1,1]*5 # start with some garbage count = 0 rd = -1 # running disparity starts at -1 for ch in s: if (count % sync_interval) == 0: sync,rd = encode_sync_8b10b(rd) result.extend(int_to_bits(sync,10)) code,rd = encode_data_8b10b(ord(ch),rd) result.extend(int_to_bits(code,10)) count += 1 result += [0]*5 # end with some zeros return result # encode numpy array of bits in to an array of encoded bits of def encode_bits_8b10b(data,sync_interval = 16): assert len(data) % 8 == 0, "Data length incorrect size!" num_words = len(data)/8 num_syncs = num_words/16 + 1 padbegin = 15 padend = 5 word_len = 10 enc_bits = numpy.zeros((num_words+num_syncs)*word_len+padbegin+padend, int) enc_bits[:padbegin] = numpy.array([0,1,1]*5)[:] # Just junk to start oset = padbegin iset = 0 count = 0 rd = -1 # running disparity starts at -1 for i in xrange(num_words): if (count % sync_interval) == 0: sync,rd = encode_sync_8b10b(rd) enc_bits[oset:oset+word_len] = int_to_bits(sync,10)[:] oset += word_len code,rd = encode_data_8b10b(bits_to_int(data[iset:iset+8]),rd) next_word = int_to_bits(code,10) enc_bits[oset:oset+word_len] = next_word oset += word_len iset += 8 count += 1 return enc_bits # build the reverse lookup table by encoding each character # with both possible running disparities and then filling # in the appropriate entries table_10b_8b = [None] * 1024 for ch in xrange(256): for rd in (-1,1): code = encode_data_8b10b(ch,rd)[0] #print "0x%x (D.%02x.%x) encodes as 0x%03x when rd=%d" % (ch,ch & 0x1F, ch >> 5,code,rd) assert table_10b_8b[code] is None or table_10b_8b[code]==ch,\ "Oops, duplicate entry in table_10b_8b" table_10b_8b[code] = ch long_message2 = """ F """ long_message = """ Fourscore and seven years ago our fathers brought forth on this continent a new nation, conceived in liberty, and dedicated to the proposition that all men are created equal. Now we are engaged in a great civil war, testing whether that nation, or any nation, so conceived and so dedicated, can long endure. We are met on a great battle-field of that war. We have come to dedicate a portion of that field as a final resting place for those who here gave their lives that that nation might live. It is altogether fitting and proper that we should do this. But, in a larger sense, we cannot dedicate... we cannot consecrate... we cannot hallow... this ground. The brave men, living and dead, who struggled here, have consecrated it far above our poor power to add or detract. The world will little note nor long remember what we say here, but it can never forget what they did here. It is for us, the living, rather, to be dedicated here to the unfinished work which they who fought here have thus far so nobly advanced. It is rather for us to be here dedicated to the great task remaining before us... that from these honored dead we take increased devotion to that cause for which they gave the last full measure of devotion; that we here highly resolve that these dead shall not have died in vain; that this nation, under God, shall have a new birth of freedom; and that government of the people, by the people, for the people, shall not perish from the earth. November 19, 1863 """ # tester for task 1 def task1_test(f): "testjig for Lab #1, Task #1 -- expects your plotting function as an argument" data = transmit(encode_string('hi there')) # data = transmit(encode_string('hi')) f(data) # tester for task 2 def task2_test(f,message): "testjig for Lab #1, Task #2 -- expects your receive function and a string as arguments" data = transmit(encode_string(message)) result = f(data) if result != message: print 'expected "%s", got "%s"' % (message,result) else: print 'received message is "%s"' % result # tester for task 3 def task3_test(f,message): "testjig for Lab #1, Task #3 -- expects your receive function and a string as arguments" data = transmit(encode_string_8b10b(message)) result = f(data) if result != message: print 'expected "%s", got "%s"' % (message,result) else: print 'received message is "%s"' % result ################################################## ## ## Code to submit task to server. Do not change. ## Task-specific code is in verify(), defined above. ## ################################################## import Tkinter class Dialog(Tkinter.Toplevel): def __init__(self, parent, title = None): Tkinter.Toplevel.__init__(self, parent) self.transient(parent) if title: self.title(title) self.parent = parent body = Tkinter.Frame(self) self.initial_focus = self.body(body) body.pack(padx=5, pady=5) self.buttonbox() self.grab_set() if not self.initial_focus: self.initial_focus = self self.protocol("WM_DELETE_WINDOW", self.cancel) self.geometry("+%d+%d" % (parent.winfo_rootx()+50,parent.winfo_rooty()+50)) self.initial_focus.focus_set() self.wait_window(self) def body(self, master): return None # add standard button box def buttonbox(self): box = Tkinter.Frame(self) w = Tkinter.Button(box, text="Ok", width=10, command=self.ok, default=Tkinter.ACTIVE) w.pack(side=Tkinter.LEFT, padx=5, pady=5) box.pack() # standard button semantics def ok(self, event=None): if not self.validate(): self.initial_focus.focus_set() # put focus back return self.withdraw() self.update_idletasks() self.apply() self.cancel() def cancel(self, event=None): # put focus back to the parent window self.parent.focus_set() self.destroy() # command hooks def validate(self): return 1 # override def apply(self): pass # override # ask user for Athena username and MIT ID class SubmitDialog(Dialog): def __init__(self,parent,error=None,title = None): self.error = error self.athena_name = None self.mit_id = None Dialog.__init__(self,parent,title=title) def body(self, master): row = 0 if self.error: l = Tkinter.Label(master,text=self.error, anchor=Tkinter.W,justify=Tkinter.LEFT,fg="red") l.grid(row=row,sticky=Tkinter.W,columnspan=2) row += 1 Tkinter.Label(master, text="Athena username:").grid(row=row,sticky=Tkinter.E) self.e1 = Tkinter.Entry(master) self.e1.grid(row=row, column=1) row += 1 Tkinter.Label(master, text="MIT ID:").grid(row=row,sticky=Tkinter.E) self.e2 = Tkinter.Entry(master) self.e2.grid(row=row, column=1) return self.e1 # initial focus # add standard button box def buttonbox(self): box = Tkinter.Frame(self) w = Tkinter.Button(box, text="Submit", width=10, command=self.ok, default=Tkinter.ACTIVE) w.pack(side=Tkinter.LEFT, padx=5, pady=5) w = Tkinter.Button(box, text="Cancel", width=10, command=self.cancel) w.pack(side=Tkinter.LEFT, padx=5, pady=5) box.pack() def apply(self): self.athena_name = self.e1.get() self.mit_id = self.e2.get() # Let user know what server said class MessageDialog(Dialog): def __init__(self, parent,message = '',title = None): self.message = message Dialog.__init__(self,parent,title=title) def body(self, master): l = Tkinter.Label(master, text=self.message,anchor=Tkinter.W,justify=Tkinter.LEFT) l.grid(row=0) # return contents of file as a string def file_contents(fname): # use universal mode to ensure cross-platform consistency in hash f = open(fname,'U') result = f.read() f.close() return result import hashlib def digest(s): m = hashlib.md5() m.update(s) return m.hexdigest() # if verify(f) indicates points have been earned, submit results # to server if requested to do so import inspect,os,urllib,urllib2 def checkoff(f,task='???',submit=True): tasks = ('L1_1','L1_2','L1_3','L1_4') if task in tasks: points = 'tba' else: raise ValueError,"task must be one of %s" % tasks if submit and points: root = Tkinter.Tk(); #root.withdraw() error = None while submit: sd = SubmitDialog(root,error=error,title="Submit Task %s?"%task) if sd.athena_name: if isinstance(f,str): fname = os.path.abspath(f) else: fname = os.path.abspath(inspect.getsourcefile(f)) post = { 'user': sd.athena_name, 'id': sd.mit_id, 'task': task, 'digest': digest(file_contents(os.path.abspath(inspect.getsourcefile(checkoff)))), 'points': points, 'filename': fname, 'file': file_contents(fname) } try: response = urllib2.urlopen('http://scripts.mit.edu/~6.02/currentsemester/submit_task.cgi', urllib.urlencode(post)).read() except Exception,e: response = 'Error\n'+str(e) if response.startswith('Error\n'): error = response[6:] else: MessageDialog(root,message=response,title='Submission response') break else: break root.destroy()