# pyThread111924
11/9/2024
#PyQt5HMUSB071724.py
# oct 12, 2024
#C:\Users\admin\PycharmProjects\pyHMUSB_1
from PyQt5 import QtWidgets
from PyQt5.QtWidgets import QApplication,QMainWindow,QMessageBox,QDialog
from PyQt5.uic import loadUi
import sys
import time
from time import sleep
import threading #from threading import Thread
from threading import Event
from threading import Thread
import queue
# initilizee
q =queue.Queue() # fifo
# seriasl Port
import keyboard
import serial
# Global variable for RS232 read
Gloabal_RS232Read =0
#../
#
#from pyHMUSB import add2,OpenPort,PortClose
#from myModuleSerial import add
#import pyHMUSB
from pyHMUSB import *
class MainUIxy(QMainWindow):
def __init__(self):
super(MainUIxy, self).__init__()
loadUi("myTestQt5.ui", self) # Load the UI file
self.pushButtonXHome.clicked.connect(XHomeAxis)
self.XJOG_PB.clicked.connect(XJOGAxis)
self.ThreadON_PB.clicked.connect(Thread_ON)
self.ThreadOFF_PB.clicked.connect(Thread_OFF)
# create the event
#PortClose()
#serialPort.close()
#print(add(1,2))
# Seerial Module
baudrate = 115200
print(add2(7,4))
print(baudrate)
serialPort = serial.Serial(port='COM4', baudrate=baudrate, parity=serial.PARITY_NONE, bytesize=8, timeout=1,
stopbits=serial.STOPBITS_ONE)
def OpenSerialPort():
try:
serialPort.close()
serialPort.open()
except serial.SerialException:
print("done")
#serialPort.open()
#OpenPort()
#PortClose()
event = Event()
timerEnable =False
def Thread_ON():
OpenSerialPort()
print("thread ON")
t1 = Thread(target=myThreadRS232Function, args=(event,q))
t1.start()
#t1.join()
# block for a while
sleep(3)
timerEnable = True
def Thread_OFF():
print("thread OFF")
print(q.get())
print(q.get())
serialPort.close()
# stop the worker thread
print('Main stopping thread')
event.set()
# wait for the new thread to finish
t1.thread.join()
# define myThreadRS232Function
def myThreadRS232Function(event,q):
global Gloabal_RS232Read
while True:
Gloabal_RS232Read += 1
#print(f'{Gloabal_RS232Read}')
message = serialPort.readline()
length = len(message)
if (length > 0):
print(message)
q.put(message)
sleep(1)
if event.is_set():
break
def callback_function():
print("Event triggered!")
start_time = time.time()
interval = 2 # seconds
if(timerEnable == True):
current_time = time.time()
print("timerEnable")
if current_time - start_time >= interval:
callback_function()
start_time = current_time # Reset the timer
#time.sleep(1) # Check every second
def XHomeAxis():
serialString = ""
serialPort.write(bytearray('{"gc":"G28.2 X0"}''\r\n','ascii'))
print("test")
print(Gloabal_RS232Read)
while True:
try:
message = serialPort.readline()
print(message)
if keyboard.is_pressed('q'):
print("quit")
break
except KeyboardInterrupt:
break
XPosition =0.300
def XJOGAxis(): #F6
serialString = ""
global XPosition
XPosIncrement = 0.200
#round(XPosition, 2)
serialPort.write(bytearray('{"gc":"G1 F150 X'+str(round(XPosition,2)) +' Y0.000 A0.000"}''\r\n', 'ascii'))
#serialPort.write(bytearray('{"gc":"G1 F150 X0.400 Y0.000 A0.000"}''\r\n', 'ascii'))
#serialPort.write(bytearray('{"gc":"G28.2 X0"}''\r\n', 'ascii'))
XPosition = XPosition +XPosIncrement
print("XJOG-test")
while True:
try:
message = serialPort.readline()
print(message)
if keyboard.is_pressed('q'):
print("quit")
break
except KeyboardInterrupt:
break
if __name__ == "__main__":
app = QApplication(sys.argv)
ui = MainUIxy()
ui.show()
app.exec_()
print("done-1")
serialPort.close()
#REFERENCE
#https://superfastpython.com/stop-a-thread-in-python/
11/9/2024
#PyQt5HMUSB071724.py
# oct 12, 2024
#C:\Users\admin\PycharmProjects\pyHMUSB_1
from PyQt5 import QtWidgets
from PyQt5.QtWidgets import QApplication,QMainWindow,QMessageBox,QDialog
from PyQt5.uic import loadUi
import sys
#from threading import Thread
import threading
import time
from threading import Event
from time import sleep
from threading import Thread
# seriasl Port
import keyboard
import serial
# Global variable for RS232 read
Gloabal_RS232Read =0
#../
#
#from pyHMUSB import add2,OpenPort,PortClose
#from myModuleSerial import add
#import pyHMUSB
from pyHMUSB import *
class MainUIxy(QMainWindow):
def __init__(self):
super(MainUIxy, self).__init__()
loadUi("myTestQt5.ui", self) # Load the UI file
self.pushButtonXHome.clicked.connect(XHomeAxis)
self.XJOG_PB.clicked.connect(XJOGAxis)
self.ThreadON_PB.clicked.connect(Thread_ON)
self.ThreadOFF_PB.clicked.connect(Thread_OFF)
# create the event
#PortClose()
#print(add(1,2))
# Seerial Module
baudrate = 115200
print(add2(7,4))
print(baudrate)
def OpenSerialPort():
try:
serialPort = serial.Serial(port='COM5', baudrate=baudrate, parity=serial.PARITY_NONE, bytesize=8, timeout=1,stopbits=serial.STOPBITS_ONE)
serialPort.close()
serialPort.open()
except serial.SerialException:
print("done")
#serialPort.open()
#OpenPort()
#PortClose()
event = Event()
def Thread_ON():
OpenSerialPort()
print("thread ON")
t1 = Thread(target=myThreadRS232Function, args=(event,))
t1.start()
#t1.join()
# block for a while
sleep(3)
def Thread_OFF():
print("thread OFF")
# stop the worker thread
print('Main stopping thread')
event.set()
# wait for the new thread to finish
t1.thread.join()
# define myThreadRS232Function
def myThreadRS232Function(event):
global Gloabal_RS232Read
while True:
Gloabal_RS232Read += 1
#print(f'{Gloabal_RS232Read}')
message = serialPort.readline()
print(message)
#messageLen =len(message)
#print(messageLen)
sleep(1)
if event.is_set():
break
#domath = Thread(target=myThreadRS232Function)
def XHomeAxis():
serialString = ""
serialPort.write(bytearray('{"gc":"G28.2 X0"}''\r\n','ascii'))
print("test")
print(Gloabal_RS232Read)
while True:
try:
message = serialPort.readline()
print(message)
if keyboard.is_pressed('q'):
print("quit")
break
except KeyboardInterrupt:
break
XPosition =0.300
def XJOGAxis(): #F6
serialString = ""
global XPosition
XPosIncrement = 0.200
#round(XPosition, 2)
serialPort.write(bytearray('{"gc":"G1 F150 X'+str(round(XPosition,2)) +' Y0.000 A0.000"}''\r\n', 'ascii'))
#serialPort.write(bytearray('{"gc":"G1 F150 X0.400 Y0.000 A0.000"}''\r\n', 'ascii'))
#serialPort.write(bytearray('{"gc":"G28.2 X0"}''\r\n', 'ascii'))
XPosition = XPosition +XPosIncrement
print("XJOG-test")
while True:
try:
message = serialPort.readline()
print(message)
if keyboard.is_pressed('q'):
print("quit")
break
except KeyboardInterrupt:
break
if __name__ == "__main__":
app = QApplication(sys.argv)
ui = MainUIxy()
ui.show()
app.exec_()
print("done-1")
serialPort.close()
#REFERENCE
#https://superfastpython.com/stop-a-thread-in-python/
working directory
C:\Users\admin\PycharmProjects\pyHMUSB_1
# define myThreadRS232Function
def myThreadRS232Function(event):
global Gloabal_RS232Read
while True:
Gloabal_RS232Read += 1
#print(f'{Gloabal_RS232Read}')
message = serialPort.readline()
print(message)
sleep(1)
if event.is_set():
break
import threading
def say_hello(subject):
print(f'Hello, {subject}!')
# Output: Hello, World!
hello_thread = threading.Thread(target=say_hello, args=("World",))
hello_thread.start()
https://www.codecademy.com/resources/docs/python/threading/thread
Best way... Define a global variable, then change the variable in the threaded function. Nothing to pass in or retrieve back
from threading import Thread
# global var
random_global_var = 5
def myThreadFunction():
global random_global_var
random_global_var += 1
domath = Thread(target=myThreadFunction)
domath.start()
domath.join()
print(random_global_var)
# result: 6
https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread
# File: myModule-Serial.py
# DATE : 1026-2024
import threading
import time
import keyboard
done = False
def say_hello(subject):
print(f'Hello, {subject}!')
# Output: Hello, World!
def worker(text):
counter =2
while True:
time.sleep(1)
counter +=1
print(f'{text}: {counter}')
t1 =threading.Thread(target=worker,daemon=True,args=("ABC",))
#t2 =hello_thread = threading.Thread(target=say_hello, args=("World",))
t1.start()
t1.join()
print('done with this test')
input("Press enter to quit")
print('after press return')
done = True
thread stop
#PyQt5HMUSB071724.py
# oct 12, 2024
#C:\Users\admin\PycharmProjects\pyHMUSB_1
from PyQt5 import QtWidgets
from PyQt5.QtWidgets import QApplication,QMainWindow,QMessageBox,QDialog
from PyQt5.uic import loadUi
import sys
#from threading import Thread
import threading
import time
from threading import Event
from time import sleep
from threading import Thread
# seriasl Port
import keyboard
import serial
# Global variable for RS232 read
Gloabal_RS232Read =0
#../
#
#from pyHMUSB import add2,OpenPort,PortClose
#from myModuleSerial import add
#import pyHMUSB
from pyHMUSB import *
class MainUIxy(QMainWindow):
def __init__(self):
super(MainUIxy, self).__init__()
loadUi("myTestQt5.ui", self) # Load the UI file
self.pushButtonXHome.clicked.connect(XHomeAxis)
self.XJOG_PB.clicked.connect(XJOGAxis)
self.ThreadON_PB.clicked.connect(Thread_ON)
self.ThreadOFF_PB.clicked.connect(Thread_OFF)
# create the event
#print(add(1,2))
# Seerial Module
baudrate = 115200
print(add2(7,4))
print(baudrate)
serialPort = serial.Serial(port='COM5', baudrate=baudrate, parity=serial.PARITY_NONE, bytesize=8, timeout=1,stopbits=serial.STOPBITS_ONE)
#serialPort.open()
#serialPort.open()
#OpenPort()
#PortClose()
event = Event()
def Thread_ON():
print("thread ON")
t1 = Thread(target=myThreadRS232Function, args=(event,))
t1.start()
#t1.join()
# block for a while
sleep(3)
def Thread_OFF():
print("thread OFF")
# stop the worker thread
print('Main stopping thread')
event.set()
# wait for the new thread to finish
t1.thread.join()
# define myThreadRS232Function
def myThreadRS232Function(event):
global Gloabal_RS232Read
while True:
Gloabal_RS232Read += 1
print(f'{Gloabal_RS232Read}')
sleep(1)
if event.is_set():
break
#domath = Thread(target=myThreadRS232Function)
def XHomeAxis():
serialString = ""
serialPort.write(bytearray('{"gc":"G28.2 X0"}''\r\n','ascii'))
print("test")
print(Gloabal_RS232Read)
while True:
try:
message = serialPort.readline()
print(message)
if keyboard.is_pressed('q'):
print("quit")
break
except KeyboardInterrupt:
break
XPosition =0.300
def XJOGAxis(): #F6
serialString = ""
global XPosition
XPosIncrement = 0.200
#round(XPosition, 2)
serialPort.write(bytearray('{"gc":"G1 F150 X'+str(round(XPosition,2)) +' Y0.000 A0.000"}''\r\n', 'ascii'))
#serialPort.write(bytearray('{"gc":"G1 F150 X0.400 Y0.000 A0.000"}''\r\n', 'ascii'))
#serialPort.write(bytearray('{"gc":"G28.2 X0"}''\r\n', 'ascii'))
XPosition = XPosition +XPosIncrement
print("XJOG-test")
while True:
try:
message = serialPort.readline()
print(message)
if keyboard.is_pressed('q'):
print("quit")
break
except KeyboardInterrupt:
break
if __name__ == "__main__":
app = QApplication(sys.argv)
ui = MainUIxy()
ui.show()
app.exec_()
print("done-1")
serialPort.close()
#REFERENCE
#https://superfastpython.com/stop-a-thread-in-python/