import numpy,wave,os,sys,time ######################################################################### ### System Block Boxes (fast and slow) ######################################################################### def black_box(input, num, den, pad): def f(n): # Assumes input is zero before n = 0 and after zero ends if n >= 0 and n < len(input): return input[n] else: return 0.0 output_de = diff_eq(num,den,f) output = numpy.array([output_de(n) for n in range(len(input)+pad)]) return output # Solves difference equation: # yCoeffs[0]*y(n) + .. + yCoeffs[K]*y(n-K) = # xCoeffs[0]*x(n) + + xCoeffs[L]*x(n-L) # Uses initial conditions for y, y[n] = 0, n < 0, and assumes x[n] = 0, n < 0 def black_boxf(xin, yCoeffs, xCoeffs, pad=0): assert(len(yCoeffs) == len(xCoeffs)) assert(yCoeffs[0] != 0.0) scale = 1.0/float(yCoeffs[0]) yCoeffs = numpy.array(yCoeffs[1:]) xCoeffs = numpy.array(xCoeffs) k = len(xCoeffs) x = numpy.zeros(k+1+len(xin)) x[k+1:] = xin[:] y = numpy.zeros(len(x)) for i in xrange(k+1,len(y)): xpast = numpy.inner(xCoeffs,x[i:i-k:-1]) ypast = numpy.inner(yCoeffs,y[i-1:i-k:-1]) y[i] = scale*(xpast - ypast) output = y[k+1:] return output ################################################################################ ## usrp interface ################################################################################ import threading # transmit to tx channel using a separate thread class tx_helper(threading.Thread): def __init__(self,tx,samples,repeat=False): threading.Thread.__init__(self) self.tx = tx self.samples = samples self.nxmit = samples.size self.repeat = repeat def run(self): # start transmitter assert self.tx.start(),"usrp transmitter failed to start" offset = 0 # where we are in transmitting data while offset < self.nxmit: end = min(offset+131072,self.nxmit) data = self.samples[offset:end].tostring() bytes_sent,underrun = self.tx.write(data) if underrun: print 'usrp transmit: underrun...' offset += bytes_sent/2 if self.repeat and offset >= self.nxmit: print '1' offset = 0 try: import usrp_602 as usrp def tx_setup(sample_rate,chan,tx_pga_gain_db,frequency): # interp rate updated below once we know dac rate tx = usrp.usrp_tx(32) tx.stop() # We are only using one active channel!!! (jkw) assert tx.set_nchannels(1), \ "usrp_ir_channel: set_tx_nchannel failed with value 1" # configure which DACs to use tx.set_mux(0x0098 if chan == 0 else 0x9800) # set correct interpolation rate interp = int(tx.dac_rate/sample_rate) assert interp <= 512,\ "usrp_ir_channel: sorry, sample rate must be at least %g" % tx.dac_rate/512 tx.set_interp_rate(interp) interp_rate = tx.interp_rate # set PGA gain assert tx_pga_gain_db >= tx.pga_min and tx_pga_gain_db <= tx.pga_max,\ "usrp_ir_channel: tx_pga_gain_db must be in the range %g to %g" % (tx.pga_min,tx.pga_max) # DACs 0 and 1 share a gain setting, as do DACs 2 and 3 tx.set_pga(2*chan,tx_pga_gain_db) # set frequency of DUC assert tx.set_tx_freq(chan,frequency),\ "usrp_ir_channel: set_tx_freq failed with value %g" % frequency return tx def tx_data(preamble, samples, repeats, tx_gain): # adjust number of transmitted samples to be a multiple of 128 # since we have to transmit a multiple of 512 bytes and there are # 4 bytes xmitted per sample nsamples = len(samples) nxmit = (1 + repeats)*nsamples + preamble adjust = nxmit % 128 if adjust != 0: nxmit += 128 - adjust # Make preample longer to align the packet of data on 128 preamble = nxmit - (1 + repeats) * nsamples # Create 16 bit data for transmitter (2x for I and Q) tx_samples = numpy.zeros(2*nxmit,dtype=numpy.int16) # Set initial values to most negative value, so preamble sends a "zero" tx_samples[:] = - tx_gain*32767.0 # Scale so 0->1 maps to -tx_gain*(2^15-1)-> tx_gain*(2^15-1) scaled_samples = (samples * 2.0 - 1.0) * tx_gain * 32767.0 # Interleave preamble + samples to transmit on I and Q # Note: Must replicate data on I and Q, as transmitter modulator # initial phase is uncertain (may move I to Q). for i in range(repeats+1): offset = 2*preamble + i*2*nsamples tx_samples[offset:offset + 2*nsamples:2] = scaled_samples tx_samples[offset+1:offset+1+2*nsamples:2] = scaled_samples return (tx_samples,nxmit) def rcvr_setup(sample_rate,chan,rx_pga_gain_db,frequency,rx_adc_offset): # decimation rate updated below once we know adc rate rx = usrp.usrp_rx(250) rx.stop() # set correct decimation rate decim = int((rx.adc_rate/sample_rate)) assert decim <= 256,\ "usrp_ir_receive: sorry, sample rate must be at least %g" % rx.adc_rate/256 rx.set_decim_rate(decim) decim_rate = rx.decim_rate # configure which ADCs to use rx.set_mux(0x10101010 if chan == 0 else 0x32323232) # set adc offset, disable input buffer since we're dc coupled if chan == 0: rx.set_adc_offset(0,rx_adc_offset) rx.set_adc_offset(1,rx_adc_offset) rx.set_adc_buffer_bypass(0,1) rx.set_adc_buffer_bypass(1,1) elif chan == 1: rx.set_adc_offset(2,rx_adc_offset) rx.set_adc_offset(3,rx_adc_offset) rx.set_adc_buffer_bypass(2,1) rx.set_adc_buffer_bypass(3,1) # disable DC offset removal control loop rx.set_dc_offset_cl_enable(0x0,0x3 if chan == 0 else 0xC) # set PGA gain assert rx_pga_gain_db >= rx.pga_min and rx_pga_gain_db <= rx.pga_max,\ "usrp_receive: rx_pga_gain_db must be in the range %g to %g" % (rx.pga_min,rx.pga_max) # DACs 0 and 1 share a gain setting, as doD ACs 2 and 3 rx.set_pga(2*chan,rx_pga_gain_db) # set it for both ADCs rx.set_pga(2*chan+1,rx_pga_gain_db) # set frequency of DUC assert rx.set_rx_freq(chan,frequency),\ "usrp_receive: set_rx_freq failed with value %g" % frequency # Make sure the number of active channels is one!!!!! assert rx.set_nchannels(1),\ "usrp_receive: set_nchannel failed with value 1" return rx # Send, receive, or transceive a waveform through the IR channel. def usrp_ir_xcv(rx_nsamples, # Number samples to receive samples, # Samples to be xmited, [] if just rcv sample_rate=4e6,# Sample rate tx_gain=1.0, # transmit scaling factor tx_pga_gain_db=0, # usrp transmit gain rx_gain=1.0, # receive scaling factor rx_pga_gain_db=0, # usrp receive gain rx_adc_offset=0, # receive adc offset frequency=0, # usrp up-convert/down-convert frequency chan=1, # assume IR in "B" side of USR preamble=128*10, # Zero samples to transmit before data rx_extra=128*40, # Rcv more than requested in loopback t_repeats=0, r_repeats=0, t_repeat = False # Repeat transmitting forever if true ): # make sure we have a USRP plugged in if not usrp.usrp_find_device(0): assert False,"usrp_ir_channel: can't find USRP board %s" % board # tx setup if samples != []: tx = tx_setup(sample_rate,chan,tx_pga_gain_db,frequency) tx_samples,nxmit = tx_data(preamble, samples, t_repeats, tx_gain) # Setup the transmitter thread tx_thread = tx_helper(tx,tx_samples,repeat=t_repeat) # The MIT IR circuit uses Q channel's ADC q_samples = [] # If just transmitting, start transmitter if rx_nsamples == 0: tx_thread.start() # Otherwise, setup receiver and run else: rx = rcvr_setup(sample_rate,chan,rx_pga_gain_db,frequency,rx_adc_offset) # Add the preamble to number of received samples if transceiving nrcv_total = rx_nsamples*(1 + r_repeats) if samples != []: # Must be transceiving nrcv_total += preamble + rx_extra # Align on 512 byte boundaries adjust = nrcv_total % 128 if adjust != 0: nrcv_total += 128 - adjust q_samples = numpy.zeros(nrcv_total,dtype=numpy.float32) # Start the receiver assert rx.start(),"usrp receiver failed to start" # Start the transmitter thread if transmitting if samples != []: tx_thread.start() count = 0 while count < nrcv_total: # compute how many samples to read on this call want = min((nrcv_total-count)<<2,131072) # read 'em buffer,overrun = rx.read(want) if overrun == True: print "Buffer Overrun!!!" # deinterleave samples into I and Q buffers data = numpy.frombuffer(buffer,dtype=numpy.int16) got = data.size/2 if got > 0: end = count + got q_samples[count:end] = data[1::2] count = end # done, stop receiver rx.stop() # Scale and offset digital number to get a value between 0 and 1.0 q_samples *= rx_gain/32768.0 q_samples += 0.5 # Strip of a number of samples equal to the preamble, if used if samples != []: q_samples = q_samples[preamble:] # wait for transmitter thread to complete, then stop transmitter # unless the transmitter is repeating forever. if samples != [] and t_repeat == False: tx_thread.join() tx.stop() # Return received samples if any return q_samples except ImportError: def usrp_ir_xcv(rx_nsamples, samples,**keywords): raise NotImplementedError,"sorry, no usrp library is available" # Channel callable object. The parameters of the channel are: # channelid = '0' (fast channel), # '2' (medium channel with ringing) # '1' (very slow channel) # 'ir' (Infrared hardware channel, requires usrp ir hardware) # noise = float > 0 is the amplitude of added channel noise # For virtual channels only, error is generated if used with ir. # random_tails = integer > 0 is an upper bound on the random length of # preappended and postappended zeros. # # When a channel instance is called with an input (a numpy array of floating # point values), a numpy array is returned of length # len(preappended)+len(input)+ len(postappended) class channel: def __init__(self,channelid='0',noise=0.0,random_tails=0,use_normal=False): self.id = channelid self.noise = noise self.tails = random_tails self.use_normal = use_normal if self.id == 'ir': assert self.noise == 0.0, "No fake noise on real channel!" assert self.tails == 0, "No fake extra samples on a real channel!" elif self.id == '0': self.den = [1,-0.4] self.num = [0.42, 0.0] elif self.id == '1': self.den = [1,-2.7,2.43,-0.729] self.num = [0.0,0.0,0.0,0.001] elif self.id == '2': self.den = [1, -2.0/3.0, 13/16.0] self.num = [5.0/16.0,0,0] else: assert False, "unknown channel %s in transmit function" % self.id def __call__(self,input): if type(input) != type(numpy.zeros(1)): input = numpy.array(input) if self.id != 'ir': out = black_boxf(input,self.den, self.num,0) if self.tails > 0: delay = numpy.zeros(numpy.random.randint(self.tails)) pad = numpy.zeros(numpy.random.randint(self.tails)) out = numpy.append(delay,out) out = numpy.append(out,pad) if self.noise > 0: s = len(out) if self.use_normal: noisevals = numpy.random.normal(scale=self.noise,size=s) else: noisevals = numpy.random.triangular(-1,0,1,size=s) noisevals *= self.noise out = out + noisevals else: out = usrp_ir_xcv(len(input),input) return out def ir_tran(input,repeat=False): usrp_ir_xcv(0,input,t_repeat=repeat) def ir_rcv(num_samples): return usrp_ir_xcv(num_samples,[])