import numpy, pylab, time
# convert nbits of int into bit sequences, lsb first
def int_to_bits(n,nbits=8):
return [(n >> i) & 1 for i in xrange(nbits)]
# encode data into sequence of samples
# prefix = sequence of bits that appear before data bits
# postfix = sequence of bits that appear after data bits
# nbits = number of data bits to encode (LSB first)
# nsamples = number of samples to generate for each bit
def encode_data(data,prefix=[1],postfix=[0],nbits=8):
# start with prefix
result = prefix[:]
# encode data LSB first
result.extend(int_to_bits(data,nbits=8))
# end with postfix bits
result.extend(postfix)
return result
# encode string. See encode_data for details...
def encode_string(s,prefix=[1],postfix=[0],nbits=8):
result = [0]*5 # start with some zeros
for ch in s:
result.extend(encode_data(ord(ch),prefix=prefix,postfix=postfix,nbits=nbits))
result += [0]*5 # end with some zeros
return result
# convert digital sample sequence into voltages, upsample, add noise
# result is numpy array
def transmit(seq,vlow=-0.5,vhigh=0.5,upsample=8,ntaps=101,bw=.25,nmag=0.1):
ndata = len(seq)*upsample
samples = [vlow]*ntaps
vlast = vlow
for s in seq:
v = vlow if s == 0 else vhigh
dv = v - vlast
samples.append(dv*0.5 + vlast)
samples.append(dv*0.75 + vlast)
samples.append(dv*0.875 + vlast)
samples.extend([v]*(upsample-3))
vlast = v
voltages = numpy.fromiter(samples,dtype=numpy.float)
taps = compute_taps(ntaps,bw)
start = 3*ntaps/2
filtered = numpy.convolve(voltages,taps)[start:start+ndata]
noise = numpy.random.triangular(-nmag,0,nmag,size=ndata)
return filtered+noise
# low-pass filter taps, cutoff is fraction of sample rate
def compute_taps(ntaps,cutoff,gain=1.0):
order = float(ntaps - 1)
# hamming window
window = [0.53836 - 0.46164*numpy.cos((2*numpy.pi*i)/order)
for i in xrange(ntaps)]
fc = float(cutoff)
wc = 2 * numpy.pi * fc
middle = (ntaps - 1)/2
taps = [0.0] * ntaps
fmax = 0 # for low pass, gain @ DC = 1.0
for i in xrange(ntaps):
if i == middle:
coeff = (wc/numpy.pi) * window[i]
fmax += coeff
else:
n = i - middle
coeff = (numpy.sin(n*wc)/(n*numpy.pi)) * window[i]
fmax += coeff
taps[i] = coeff
gain = gain / fmax
for i in xrange(ntaps):
taps[i] *= gain
return taps
# convert bit sequence (lsb first) to an int
def bits_to_int(bits):
result = 0
for i in xrange(len(bits)):
result += bits[i] * (1 << i)
return result
# 8b/10 encoder
# two encoder tables: 5b/6b and 3b/4b
# each entry is a tuple of ints
# the two ints of each entry should be the same, or be complements
# RD is -1: use tuple[0]; if tuple[0]!=tuple[1], set RD to +1
# RD is +1: use tuple[1]; if tuple[0]!=tuple[1], set RD to -1
table_5b_6b = [
(0x39,0x06), # D.00
(0x2E,0x11), # D.01
(0x2D,0x12), # D.02
(0x23,0x23), # D.03
(0x2B,0x14), # D.04
(0x25,0x25), # D.05
(0x26,0x26), # D.06
(0x07,0x38), # D.07
(0x27,0x18), # D.08
(0x29,0x29), # D.09
(0x2A,0x2A), # D.10
(0x0B,0x0B), # D.11
(0x2C,0x2C), # D.12
(0x0D,0x0D), # D.13
(0x0E,0x0E), # D.14
(0x3A,0x05), # D.15
(0x36,0x09), # D.16
(0x31,0x31), # D.17
(0x32,0x32), # D.18
(0x13,0x13), # D.19
(0x34,0x34), # D.20
(0x15,0x15), # D.21
(0x16,0x16), # D.22
(0x17,0x28), # D.23
(0x33,0x0C), # D.24
(0x19,0x19), # D.25
(0x1A,0x1A), # D.26
(0x1B,0x24), # D.27
(0x1C,0x1C), # D.28
(0x1D,0x22), # D.29
(0x1E,0x21), # D.30
(0x35,0x0A), # D.31
]
# index by (byte >> 5) & 0x7
# use D.x.A7 when
# if RD is -1, and x is 17, 18, 20
# if RD is +1, and x 11, 13, 14
table_3b_4b = [
(0xD,0x2), # D.x.0
(0x9,0x9), # D.x.1
(0xA,0xA), # D.x.2
(0x3,0xC), # D.x.3
(0xB,0x4), # D.x.4
(0x5,0x5), # D.x.5
(0x6,0x6), # D.x.6
(0x7,0x8), # D.x.P7
(0xE,0x1), # D.x.A7
]
# 10-bit sync to use for RD==-1, RD==+1:
sync = (0x17C,0x283)
# (sync,updated rd) = encode_sync_8b10b(rd)
def encode_sync_8b10b(rd):
if rd == -1:
return (sync[0],1)
else:
return (sync[1],-1)
# (10b,updated rd) = encode_8b_10b(8b,rd)
def encode_data_8b10b(ch,rd):
# encode low-order 6 bits
x = ch & 0x1F
x_6b = table_5b_6b[x]
if rd == -1:
xcode = x_6b[0]
if x_6b[0] != x_6b[1]: rd = 1
else: # rd == 1
xcode = x_6b[1]
if x_6b[0] != x_6b[1]: rd = -1
# encode high-order 3 bits
y = (ch >> 5) & 0x7
y_4b = table_3b_4b[y]
if rd == -1:
if y == 7 and x in (17,18,20):
# use D.x.A7
y_4b = table_3b_4b[8]
ycode = y_4b[0]
if y_4b[0] != y_4b[0]: rd = 1
else: # rd == 1
if y == 7 and x in (11,13,14):
# use D.x.A7
y_4b = table_3b_4b[8]
ycode = y_4b[1]
if y_4b[0] != y_4b[0]: rd = -1
return((ycode << 6) + xcode,rd)
# encode string with 8b/10b inserting syncs every so often
def encode_string_8b10b(s,sync_interval = 16):
result = [0,1,1]*5 # start with some garbage
count = 0
rd = -1 # running disparity starts at -1
for ch in s:
if (count % sync_interval) == 0:
sync,rd = encode_sync_8b10b(rd)
result.extend(int_to_bits(sync,10))
code,rd = encode_data_8b10b(ord(ch),rd)
result.extend(int_to_bits(code,10))
count += 1
result += [0]*5 # end with some zeros
return result
# build the reverse lookup table by encoding each character
# with both possible running disparities and then filling
# in the appropriate entries
table_10b_8b = [None] * 1024
for ch in xrange(256):
for rd in (-1,1):
code = encode_data_8b10b(ch,rd)[0]
#print "0x%x (D.%02x.%x) encodes as 0x%03x when rd=%d" % (ch,ch & 0x1F, ch >> 5,code,rd)
assert table_10b_8b[code] is None or table_10b_8b[code]==ch,\
"Oops, duplicate entry in table_10b_8b"
table_10b_8b[code] = ch
long_message = """
Fourscore and seven years ago our fathers brought forth on this
continent a new nation, conceived in liberty, and dedicated to the
proposition that all men are created equal.
Now we are engaged in a great civil war, testing whether that nation,
or any nation, so conceived and so dedicated, can long endure. We are
met on a great battle-field of that war. We have come to dedicate a
portion of that field as a final resting place for those who here
gave their lives that that nation might live. It is altogether fitting
and proper that we should do this.
But, in a larger sense, we cannot dedicate... we cannot
consecrate... we cannot hallow... this ground. The brave men, living
and dead, who struggled here, have consecrated it far above our poor
power to add or detract. The world will little note nor long remember
what we say here, but it can never forget what they did here. It is
for us, the living, rather, to be dedicated here to the unfinished work
which they who fought here have thus far so nobly advanced. It is
rather for us to be here dedicated to the great task remaining before
us... that from these honored dead we take increased devotion to that
cause for which they gave the last full measure of devotion; that we
here highly resolve that these dead shall not have died in vain; that
this nation, under God, shall have a new birth of freedom; and that
government of the people, by the people, for the people, shall not
perish from the earth.
November 19, 1863
"""
# tester for task 1
def task1_test(f):
"testjig for Lab #1, Task #1 -- expects your plotting function as an argument"
data = transmit(encode_string('hi there'))
f(data)
# tester for task 2
def task2_test(f,message):
"testjig for Lab #1, Task #2 -- expects your receive function and a string as arguments"
data = transmit(encode_string(message))
result = f(data)
if result != message:
print 'expected "%s", got "%s"' % (message,result)
else:
print 'received message is "%s"' % result
# tester for task 3
def task3_test(f,message):
"testjig for Lab #1, Task #3 -- expects your receive function and a string as arguments"
data = transmit(encode_string_8b10b(message))
result = f(data)
if result != message:
print 'expected "%s", got "%s"' % (message,result)
else:
print 'received message is "%s"' % result
__all__ = ['task1_test','task2_test','task3_test',
'bits_to_int','table_10b_8b','long_message']