import numpy,os,wave import matplotlib.pyplot as p # This file has faster versions of samples_to_bits, transmit, and a difference # equation solver. Key issue is to avoid repeated memory allocation associated # incrementally increasing the length of lists, as that seems to run slowly in # python, even though that is often an elegant way to write a program. # set up for audio playback audio = None try: import ossaudiodev audio = 'oss' except ImportError: try: import pyaudio audio = 'pyaudio' except ImportError: pass def play(samples,sample_rate,gain=None): nsamples = len(samples) # compute appropriate gain if none supplied if gain is None: gain = 1.0/max(numpy.absolute(samples)) # convert -1V to 1V waveform to signed 16-bit ints asamples = numpy.zeros(nsamples,dtype=numpy.int16) numpy.multiply(samples,32767.0 * gain,asamples) offset = 0 # where we are in sending data nremaining = nsamples # send audio samples to output device if audio == 'oss': stream = ossaudiodev.open('w') fmt,chans,rate = stream.setparameters(ossaudiodev.AFMT_S16_LE,1,int(sample_rate)) assert rate==sample_rate,\ "%g is not a supported sample rate for audio playback" % sample_rate while nremaining > 0: can_send = min(stream.obuffree(),nremaining) if can_send > 0: end = offset + can_send stream.write(asamples[offset:end].tostring()) offset = end nremaining -= can_send stream.close() elif audio == 'pyaudio': dev = pyaudio.PyAudio() devinfo = dev.get_default_output_device_info() assert dev.is_format_supported(sample_rate, output_device = devinfo['index'], output_channels = 1, output_format = pyaudio.paInt16),\ "%g is not a supported sample rate for audio playback" % self.sample_rate stream = dev.open(int(sample_rate),1,pyaudio.paInt16,output=True) while nremaining > 0: can_send = min(stream.get_write_available(),nremaining) if can_send > 0: end = offset + can_send stream.write(asamples[offset:end].tostring(),num_frames=can_send) offset = end nremaining -= can_send stream.close() else: assert False,"Sorry, no audio device library could be found" # returns list of sampled_waveforms, one per channel. # Audio samples are in range -1.0 to +1.0 if gain=None is used def read_wavfile(filename,gain=None): assert os.path.exists(filename),"file %s doesn't exist" % filename wav = wave.open(filename,'rb') nframes = wav.getnframes() assert nframes > 0,"%s doesn't have any audio data!" % filename nchan = wav.getnchannels() sample_rate = wav.getframerate() sample_width = wav.getsampwidth() # see http://ccrma.stanford.edu/courses/422/projects/WaveFormat/ g = 1.0 if gain is None else gain if sample_width == 1: # data is unsigned bytes, 0 to 255 dtype = numpy.uint8 scale = g / 127.0 offset = -1.0 elif sample_width == 2: # data is signed 2's complement 16-bit samples (little-endian byte order) dtype = numpy.int16 scale = g / 32767.0 offset = 0.0 elif sample_width == 4: # data is signed 2's complement 32-bit samples (little-endian byte order) dtype = numpy.int32 scale = g / 2147483647.0 offset = 0.0 else: assert False,"unrecognized sample width %d" % sample_width outputs = [numpy.zeros(nframes,dtype=numpy.float64) for i in xrange(nchan)] count = 0 while count < nframes: audio = numpy.frombuffer(wav.readframes(nframes-count),dtype=dtype) end = count + (len(audio) / nchan) for i in xrange(nchan): outputs[i][count:end] = audio[i::nchan] count = end # scale data appropriately for i in xrange(nchan): numpy.multiply(outputs[i],scale,outputs[i]) if offset != 0: numpy.add(outputs[i],offset,outputs[i]) # apply auto gain if gain is None: maxmag = max([max(numpy.absolute(outputs[i])) for i in xrange(nchan)]) for i in xrange(nchan): numpy.multiply(outputs[i],1.0/maxmag,outputs[i]) return [outputs[i] for i in xrange(nchan)] def read_sound(filename): return read_wavfile(filename)[0] def play_sound(sound): play(sound,8000) def samples_to_bits(samples, thresh, offset, samples_per_bit): read_bits = [0]*(1 + len(samples)/samples_per_bit) s_index = offset i = 0 while (s_index) < len(samples): v = samples[s_index] if v > thresh: read_bits[i] = 1 else: read_bits[i] = 0 s_index = s_index + samples_per_bit i = i + 1 return read_bits[0:i] def transmit(bits,npreamble=0, npostamble=0, samples_per_bit=8, v0 = 0.0, v1 = 1.0, repeat = 1): """ generate sequence of voltage samples: bits: binary message sequence npreamble: number of leading v0 samples npostamble: number of trailing v0 samples samples_per_bit: number of samples for each message bit v0: voltage to output for 0 bits v1: voltage to output for 1 bits repeat: how many times to repeat whole shebang """ # Default to all v0's samples = [v0]*(npreamble + len(bits)*samples_per_bit + npostamble) index = npreamble for i in range(len(bits)): if bits[i] != 0: samples[index:index+samples_per_bit] = [v1]*samples_per_bit index = index + samples_per_bit samples = samples * repeat return samples # Not completely general difference equation solver: # yCoeffs[0]*y(n) + .. + yCoeffs[K]*y(n-K) = # xCoeffs[0]*x(n) + + xCoeffs[L]*x(n-L) # Uses initial conditions for y, y[n] = 0, n < 0, and assumes x[n] = 0, n < 0 # and assumes x[n] = xin[n] for n >= 0. # Returns y[n], 0 <= n < length(xin) in a numpy array. # Works even if xin is complex def de_solve(xin, yCoeffs, xCoeffs): assert(len(yCoeffs) == len(xCoeffs)) assert(yCoeffs[0] != 0.0) scale = 1.0/float(yCoeffs[0]) yCoeffs = numpy.array(yCoeffs[1:]) xCoeffs = numpy.array(xCoeffs) k = len(xCoeffs) x = xin[0] * numpy.zeros(k+1) # Make sure if xin complex, so is x x = numpy.append(x,xin) y = 0.0 * x # Make sure if xin complex, so is y for i in xrange(k+1,len(y)): xpast = sum(xCoeffs*x[i:i-k:-1]) ypast = sum(yCoeffs*y[i-1:i-k:-1]) y[i] = scale*(xpast - ypast) output = y[k+1:] return output def de_solver(yCoeffs, xCoeffs): def f(x): return de_solve(x, yCoeffs, xCoeffs) return f def channel1(input, noise=0.0): output = de_solve(input,[1,-2.7,2.43,-0.729],[0.0, 0.0, 0.0,1.0e-3]) noisevals = numpy.random.normal(0.0,1.0,size=len(output)) return output + noise*noisevals def channel2(input, noise=0.0): output = de_solve(input,[1.0, -3.0/2.0, 13.0/16.0],[5.0/16.0,0,0]) noisevals = numpy.random.normal(0.0,1.0,size=len(output)) return output + noise*noisevals def plot_freq_response(omega,mhejw,title): p.figure() p.plot(omega,mhejw) p.title("Magnitude of $|H(e^{j\Omega})|$ for "+title) p.xlabel("$\Omega$") p.grid() # Compute performance ratio for filter def filter_quality(stop_band,pass_band,omega,mag_hejw): maxstopband = 0.0 minpassband = 1.0 for i in xrange(len(omega)): w = omega[i] if w >= stop_band[0] and w <= stop_band[1]: maxstopband = max(maxstopband,mag_hejw[i]) if w >= pass_band[0] and w <= pass_band[1]: minpassband = min(minpassband,mag_hejw[i]) return minpassband/maxstopband