import time from time import gmtime, strftime, localtime import serial import serial.tools.list_ports import csv import threading import sys import glob import re import os import pdb #variable declaration with defaults loadForce = 0 #N dwellTime = 1 #sec cycles = 1 forceData = [('Tension', 'Unit', 'Compression', 'Unit')] curCycle = 0 downTravel = False mkCMD = {'pkT': '?PT', 'pkC': '?PC', 'clrP': 'CLR', 'getMem': 'MEM', 'clrMem': 'CLRMEM', 'cur': '?', 'newton': 'N'} unitSpd = {'lbF': 'e0010.0', 'N': 'e0005.0'} mk = 5 test_array = [] com = serial.Serial() def commControl(): mkPort = 'str' ports = list(serial.tools.list_ports.comports()) for p in ports: print(p) #if 'Mark-10' in p.description: if re.search("mark-10", p.description, re.IGNORECASE): mkPort = p.device com = serial.Serial( baudrate = 115200, parity = serial.PARITY_NONE, bytesize = serial.EIGHTBITS, stopbits = serial.STOPBITS_ONE, timeout = 1 ) #pdb.set_trace() com.port = mkPort com.open() #just a function to spit out inputs from serial def read_force(): if com.is_open is True: com.write('X'.encode('ascii')) liveread = com.readline() #print(repr(liveread)) livedat = bytes.decode(liveread.strip()) #print(str(livedat)) liveregx = re.findall(r"[-+]?\d*\.\d+|\d+", livedat) #print(str(liveregx)) return liveregx[0] else: print('Port Error: COM not open') def gauge_com(mkcom, com): com.write('/ \r\n'.encode('ascii')) com.write(mkcom.encode('ascii')) com.write('\r\n'.encode('ascii')) com.write('\\ \r\n'.encode('ascii')) def param(mk): dwellTime = int(input("Enter dwell time in seconds: ")) cycles = int(input("Enter number of cycles: ")) if mk == 5: test_array = [0.1, 0.2, 0.4, 0.6, 0.8, 1.0, 1.2, 1.4, 1.6, 1.8, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5] else: test_array = [5, 7.5, 10, 12.5, 15, 20, 25, 30, 35] for x in range(0, len(test_array)): serialComm(test_array[x], dwellTime, cycles) print(test_array[x]) #dummy command #def serialComm(): def serialComm(loadForce, dwellTime, cycles): #pdb.set_trace() #loadForce = float(input("Enter compression load: ")) #dwellTime = int(input("Enter dwell time in seconds: ")) #cycles = int(input("Enter number of cycles: ")) #configure serial connection (Default: COM4) #output file loadForce + time .csv commControl() curTime = strftime("%d%m%Y-%H%M%S", localtime()) fileForce = str(loadForce) regexF = re.sub(r'\ |\?|\.|\!|\/|\;|\:', '', fileForce) filename = curTime + '_F' + regexF + '.csv' for c in range(0, cycles): com.write('e0001.0'.encode('ascii')) com.write('R'.encode('ascii')) getForce = read_force() curForce = float(getForce) if curForce < loadForce: com.write('d'.encode('ascii')) downTravel = True elif curForce > loadForce: com.write('u'.encode('ascii')) else: print('Loading Error') break while downTravel is True: getForce2 = read_force() currentForce = float(getForce2) if currentForce >= loadForce: com.write('s'.encode('ascii')) print('Preloading... Cycle ' + str(c)) #print('C ' + str(currentForce) + ' L ' + str(loadForce)) downTravel = False time.sleep(dwellTime) com.write('e0075.0'.encode('ascii')) com.write('u'.encode('ascii')) time.sleep(1) #wait 1 seconds for gauge to travel up com.write('s'.encode('ascii')) gauge_com(mkCMD['pkT']) # request peak tension datOutTensLive = com.read(20) datOutT = bytes.decode(datOutTensLive.strip()) tensRegex = re.findall(r"([-+]?\d*\.\d+) (lbF|N|gF|ozF|kgF)", datOutT) time.sleep(2) com.reset_output_buffer() gauge_com(mkCMD['pkC']) datOutComLive = com.read(20) datOutC = bytes.decode(datOutComLive.strip()) compRegex = re.findall(r"([-+]?\d*\.\d+) (lbF|N|gF|ozF|kgF)", datOutC) dataLine = tensRegex[0] + compRegex[0] forceData.append(dataLine) com.reset_output_buffer() #pdb.set_trace() #with open(filename, 'w') as fp: with open(filename, 'w', newline='') as fp: writer=csv.writer(fp) for x in forceData: writer.writerow(x) com.close() def main(): start_time = time.time() case1 = True cleaning = input('Number of cleanings: ') pathdir = 'cleaning'+cleaning os.makedirs(pathdir, exist_ok = True) os.chdir(pathdir) ver = int(input('5 or 50 gauge: ')) while case1 is True: if ver == 5: param(5) case1 = False break if ver == 50: param(50) case1 = False break else: ver = input('Try again. 5 or 50 gauge: ') os.chdir('..') elapsed_time = time.time() - start_time print('Data acquisition complete. Time elapsed: ' + time.strftime("%H:%M:%S", time.gmtime(elapsed_time))) main() #serialComm()