您现在的位置是:首页 >技术交流 >python接收遥测终端后端服务代码网站首页技术交流
python接收遥测终端后端服务代码
import socket
import threading
import SocketServer
import Queue
import time
queue_list = {}
client_list = []
socketdatalist = {}
#线程句柄,处理每个线程内收到的监控板数据
class ThreadedUDPRequestHandler(SocketServer.BaseRequestHandler):
def handle(self):
data = self.request[0]
socket = self.request[1]
#打印收到的监控板的ip和端口号
print "
服务器已连接 from: ", self.client_address[0]," port:",self.client_address[1]
#print "
服务器已连接 接收数据为: ", data
#释放系统资源
del socket
del data
#该类会启动线程句柄
class TYThreadingMixIn:
"""Mix-in class to handle each request in a new thread."""
# Decides how threads will act upon termination of the
# main process
daemon_threads = False
def process_request_thread(self,queue, client_address):
"""Same as in BaseServer but as a thread.
In addition, exception handling is done here.
"""
while True:
try:
#队列为阻塞型,如果没有数据则线程处理暂停
task = queue.get(block=True,timeout=10)
except Queue.Empty:
cur_thread = threading.current_thread()
log="Thread:name{} NO.{} client {} queue timeout over!".format(cur_thread.name, cur_thread.ident, client_address)
client_name = ("{}_{}".format(client_address[0], client_address[1]))
client_list.remove(client_name)
queue_list.pop(client_name)
print log
break
#print task
queue.task_done()
try:
self.finish_request(task, client_address)
self.shutdown_request(task)
except:
self.handle_error(task, client_address)
self.shutdown_request(task)
#启动一个新的线程,处理每个新增加的监控板地址
def process_request(self, request, client_address):
"""Start a new thread to process the request."""
client_name = ("{}_{}".format(client_address[0], client_address[1]))
#如果监控板的ip没有在list中,则在队列中新增一条监控板地址信息
if client_name not in client_list:
client_list.append(client_name)
queue_list[client_name] = Queue.Queue(100)
#启动一个线程,进行超时处理,如果10秒钟未接受到监控板的数据,则删除该监控板线程
t = threading.Thread(target = self.process_request_thread,
args = (queue_list[client_name], client_address),
name=client_name)
t.daemon = self.daemon_threads
t.start()
#在队列中假如新的监控板地址信息
queue_list[client_name].put(request)
def client(ip, port, message, name, cycle=0):
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
#sock.connect((ip, port))
try:
for i in range(0,cycle):
sock.sendto("{} cnt {}".format(message,i),(ip, port))
response = sock.recvfrom(65536)
print "Client{}:get: {}".format(name,response)
finally:
sock.close()
class TYThreadingUDPServer(TYThreadingMixIn, SocketServer.UDPServer): pass
#主函数
if __name__ == "__main__":
#绑定ip和端口,目前设定为绑定所有ip的11111端口
HOST, PORT = "0.0.0.0", 11111
server = TYThreadingUDPServer((HOST, PORT), ThreadedUDPRequestHandler)
ip, port = server.server_address
# 启动一个socket线程池
server_thread = threading.Thread(target=server.serve_forever)
# Exit the server thread when the main thread terminates
server_thread.daemon = True
server_thread.start()
print "Server loop running in thread:", server_thread.name
server_thread.join()
#进程等待,该主进程等待所有的线程结束
while True:
time.sleep(5)
try:
i=1
#for k,v in queue_list.iteritems():
# print i,":",k
# i=i+1
except:
server.shutdown()
server.server_close()
finally:
pass
#释放系统资源
server.shutdown()
server.server_close()