12/6
#file: pyHM120624
import serial
import threading
def read_from_port(ser):
while True:
reading = ser.readline().decode()
if reading:
print(f'\nrx: ,{reading}\n')
if __name__ == '__main__':
ser = serial.Serial(port='COM4', baudrate=115200, timeout=1) # Replace with your port and baud rate
thread = threading.Thread(target=read_from_port, args=(ser,))
thread.daemon = True # Make thread a daemon so it exits when main thread exits
thread.start()
# Do other things in the main thread
while True:
# Example: Send data to the serial port
data_to_send = input("Enter data to send: ")
ser.write(data_to_send.encode())
12/6
#file: pyHM120624
# https://jackwhitworth.com/blog/return-values-from-a-python-thread/
from threading import Thread
import time
import random
class ReturnableThread(Thread):
# This class is a subclass of Thread that allows the thread to return a value.
def __init__(self, target):
Thread.__init__(self)
self.target = target
self.result = None
def run(self) -> None:
self.result = self.target()
# Returns a random number after a 1 second delay
def randomNumber() -> int:
time.sleep(1)
return random.randint(0, 100)
# Start a new thread
newThread = ReturnableThread(target=lambda: randomNumber())
newThread.start()
# Wait for the thread to finish
while newThread.result is None:
time.sleep(0.1)
11/29
Starting and Stopping Python Threads With Events in Python Threading Module.
https://www.youtube.com/watch?v=j_envuVVBNw&t=4s
https://www.youtube.com/watch?v=j_envuVVBNw&t=4s
https://www.instructables.com/Starting-and-Stopping-Python-Threads-With-Events-i/
''
pyHM_Rport
11/27/2-24
task read serial prot
'''
# import itemd
# QT Designer interface
import sys
from PyQt5 import QtWidgets
from PyQt5.QtWidgets import QApplication,QMainWindow,QMessageBox,QDialog
from PyQt5.uic import loadUi
#...threading
import threading
from threading import Event
from threading import Thread
import queue #FIFO Lib
import datetime
import time
from time import sleep
#.. initilizee
q =queue.Queue() # fifo
message2 ="hello"
q.put(message2)
q.put(message2)
event = Event()
#..Variable
global Gloabal_RS232Read
Gloabal_RS232Read =1
# define myThreadRS232Function
def myThreadRS232Function():
print('myThreadRS232Function')
global Gloabal_RS232Read
#def myThreadRS232Function(event, q):
while True:
Gloabal_RS232Read += 1
print(f' count= {Gloabal_RS232Read}')
time.sleep(4)
#message = serialPort.readline()
message ='12345'
length = len(message)
#
if (length > 0):
#print(type(message)) # converting
#print(f'length = {length}')
# QT Interface gUI
class MainUIxy(QMainWindow):
def __init__(self):
super(MainUIxy, self).__init__()
loadUi("myTestQt5.ui", self) # Load the UI file
self.pushButtonXHome.clicked.connect(qt1XHomeAxis)
self.XJOG_PB.clicked.connect(qt2XJOGAxis)
self.ThreadON_PB.clicked.connect(qt3Thread_ON)
self.ThreadOFF_PB.clicked.connect(qt4Thread_OFF)
self.PBStartTimer.clicked.connect(qt5StarTimerCallBack)
# QT Callback
def qt1XHomeAxis():
print("1XHomeAxis\n")
def qt2XJOGAxis(): #
print("2XJOG-test reqested \n")
def qt3Thread_ON():
print("3thread requested\n")
# OpenSerialPort()
t1 = Thread(target = myThreadRS232Function)
#t1 = Thread(target=myThreadRS232Function, args=(event, q))
t1.start()
#t1.join()
print("thread requested done\n")
def qt4Thread_OFF():
print("4thread OFF\n")
t1.join()
print ('thread done')
def qt5StarTimerCallBack():
print("5StatTimerCallBack")
# Runnning app
if __name__ == "__main__":
app = QApplication(sys.argv)
ui = MainUIxy()
ui.show()
app.exec_()
../ off
print(q.get())
print(q.get())
#timer.join()
#serialPort.close()
# stop the worker thread
print('Main stopping thread')
event.set()
# wait for the new thread to finish
t1.thread.join()
# Wait for the timer to finish
#timer.join()
../
global Gloabal_RS232Read
print("thread Active\n")
while True:
Gloabal_RS232Read += 1
print(f'{Gloabal_RS232Read}')
time.sleep(4)
#message = serialPort.readline()
message ='12345'
length = len(message)
if (length > 0):
#print(type(message)) # converting
print(f'length = {length}')
#print(length)
#output = codecs.decode(message) # display # outputprint(type(output))
#print('Output: ' + output)
#x = output.count('sr')
#print(x)
#if y > 0:
# convert to dic, convert python string to dictionaries
#json_string = output
#dictionary = json.loads(json_string)
#print(dictionary['sr']['posx'])
#print(f"myThread: {message} ok")
#q.put(message)
../