py-聊天室(kivy框架)
学了一段时间python,也做了一些小脚本,但没学一个GUI框架是一个不完整的学习路线。然后就找了找python的框架,大多数用的都是Tkinter,但没看到做的好看的,所以就用了kivy,不过kivy的国内教程实在是少之又少,国外用的多点。
1.kivy简介
Kivy是一个开源工具包能够让使用相同源代码创建的程序能跨平台运行。它主要关注创新型用户界面开发,如:多点触摸应用程序。Kivy还提供一个多点触摸鼠标模拟器。当前支持的平台包括:Linux、Windows、Mac OS X和Android。Kivy拥有能够处理动画、缓存、手势、绘图等功能。它还内置许多用户界面控件如:按纽、摄影机、表格、Slider和树形控件等。(所以我才用它,不仅跨多平台,还画面好、功能多。 ( ̄﹁ ̄) )
开始正题,这个聊天室项目当然不是这个仅仅看了几天官方文档的流涨自己做出来的,基本是按着教程一步一步来的,所以以后如果还进一步学习的话,会在该基础上添加新功能和优化界面(这个界面做的真不乍得,有一说一)。
2.代码
main.py
import kivy from kivy.app import App from kivy.uix.gridlayout import GridLayout from kivy.uix.label import Label from kivy.uix.textinput import TextInput from kivy.uix.button import Button from kivy.uix.screenmanager import ScreenManager,Screen from kivy.clock import Clock from kivy.core.window import Window from kivy.uix.scrollview import ScrollView import socket_client import os import sys kivy.require('1.11.1') # 导入字体(宋体)解决kivy的中文乱码问题 # font1 = kivy.resources.resource_find("./font/simsun.ttc") font1 = kivy.resources.resource_find("simsun.ttc") class ScrollableLable(ScrollView): def __init__(self,**kwargs): super().__init__(**kwargs) self.layout=GridLayout(cols=2,size_hint_y=None) self.add_widget(self.layout) self.chat_history=Label(size_hint_y=None,markup=True) self.scroll_to_point=Label() self.layout.add_widget(self.chat_history) self.layout.add_widget(self.scroll_to_point) # 更新chat信息 def update_chat_history(self,message): self.chat_history.text+='n'+message # 设置chat字体 self.chat_history.font_name=font1 self.layout.height=self.chat_history.texture_size[1]+15 self.chat_history.height=self.chat_history.texture_size[1] self.chat_history.text_size=(self.chat_history.width*0.98,None) self.scroll_to(self.scroll_to_point) # 改变窗口时更新窗口方法 def update_chat_history_layout(self,_=None): self.layout.height=self.chat_history.texture_size[1]+15 self.chat_history.height=self.chat_history.texture_size[1] self.chat_history.text_size=(self.chat_history.width*0.98,None) # 连接页 class ConnectPage(GridLayout): def __init__(self,**kwargs): super().__init__(**kwargs) self.cols= 2 # 存储输入字段信息 if os.path.isfile("prev_details.txt"): with open("prev_details.txt","r") as f: d=f.read().split(",") prev_ip=d[0] prev_port=d[1] prev_username=d[2] else: prev_ip="" prev_port="" prev_username="" # elements for page self.add_widget(Label(text='IP:')) self.ip=TextInput(text=prev_ip,multiline=False) self.add_widget(self.ip) self.add_widget(Label(text='Port:')) self.port=TextInput(text=prev_port,multiline=False) self.add_widget(self.port) self.add_widget(Label(text='Username:')) self.username=TextInput(text=prev_username,multiline=False,font_name=font1) self.add_widget(self.username) # 添加页面跳转按钮 self.join=Button(text="Join") # 添加按钮事件 self.join.bind(on_press=self.join_button) self.add_widget(Label()) self.add_widget(self.join) def join_button(self,instance): port=self.port.text ip=self.ip.text username=self.username.text with open("prev_details.txt","w") as f: f.write(f"{ip},{port},{username}") info=f"Attempting to join {ip}:{port} as {username}" # 将首页的info信息存入info页 chat_app.info_page.update_info(info) # 跳转页面 chat_app.screen_manager.current="Info" Clock.schedule_once(self.connect,1) def connect(self,_): port=int(self.port.text) ip=self.ip.text username=self.username.text if not socket_client.connect(ip,port,username,show_error): return chat_app.create_chat_page() chat_app.screen_manager.current="Chat" class InfoPage(GridLayout): def __init__(self,**kwargs): super().__init__(**kwargs) # element for page self.cols= 1 self.message=Label(halign="center",valign="middle",font_size=30,font_name=font1) # self.message=Label(halign="center",valign="middle",font_size=30) # 绑定事件 self.message.bind(width=self.update_text_width) self.add_widget(self.message) def update_info(self,message): self.message.text=message def update_text_width(self,*_): self.message.text_size=(self.message.width*0.9,None) class ChatPage(GridLayout): def __init__(self,**kwargs): super().__init__(**kwargs) # element for page self.cols= 1 self.row=2 # ScrollableLable为history(历史记录) self.history=ScrollableLable(height=Window.size[1]*0.9,size_hint_y=None) self.add_widget(self.history) self.new_message=TextInput(width=Window.size[0]*0.8,size_hint_x=None,multiline=False,font_name=font1) self.send=Button(text="Send") self.send.bind(on_press=self.send_message) bottom_line=GridLayout(cols=2) bottom_line.add_widget(self.new_message) bottom_line.add_widget(self.send) self.add_widget(bottom_line) Window.bind(on_key_down=self.on_key_down) Clock.schedule_once(self.focus_text_input,1) # 监听 socket_client.start_listening(self.incoming_message,show_error) #解决窗口变化问题 self.bind(size=self.adjust_fields) def adjust_fields(self,*_): # 控制chat界面的大小 if Window.size[1]*0.1<50: new_height=Window.size[1]-50 else: new_height=Window.size[1]*0.9 self.history.height=new_height # 下面零件的大小 if Window.size[0]*0.2<100: new_width=Window.size[0]-100 else: new_width=Window.size[0]*0.8 self.new_message.width=new_width #每次改变窗口都调用更新方法 Clock.schedule_once(self.history.update_chat_history_layout,0.01) def on_key_down(self,instance,keyboard,keycode,text,modifiers): if keyboard==40: self.send_message(None) def send_message(self,_): message=self.new_message.text self.new_message.text="" if message: self.history.update_chat_history(f"[color=dd2020]{chat_app.connect_page.username.text}[/color] > {message}") socket_client.send(message) Clock.schedule_once(self.focus_text_input,0.1) def focus_text_input(self,_): self.new_message.focus=True def incoming_message(self,username,message): self.history.update_chat_history(f"[color=20dd20]{username}[/color] > {message}") print("send a message!!!") # self.add_widget(Label(text="Hey at least it worked up to this point!!!!")) class ChatApp(App): def build(self): # 页面控制 self.screen_manager=ScreenManager() # 首页 self.connect_page=ConnectPage() screen=Screen(name="Connect") screen.add_widget(self.connect_page) self.screen_manager.add_widget(screen) # info页 self.info_page=InfoPage() screen=Screen(name="Info") screen.add_widget(self.info_page) self.screen_manager.add_widget(screen) return self.screen_manager def create_chat_page(self): self.chat_page=ChatPage() screen=Screen(name="Chat") screen.add_widget(self.chat_page) self.screen_manager.add_widget(screen) def show_error(message): chat_app.info_page.update_info(message) # 跳转页面 chat_app.screen_manager.current="Info" Clock.schedule_once(sys.exit,10) if name == '__main__': chat_app=ChatApp() chat_app.run()
socket_server.py
import socket import select HEADER_LENGTH = 10 IP = "127.0.0.1" PORT = 1234 # Create a socket # socket.AF_INET - address family, IPv4, some otehr possible are AF_INET6, AF_BLUETOOTH, AF_UNIX # socket.SOCK_STREAM - TCP, conection-based, socket.SOCK_DGRAM - UDP, connectionless, datagrams, socket.SOCK_RAW - raw IP packets server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # SO_ - socket option # SOL_ - socket option level # Sets REUSEADDR (as a socket option) to 1 on socket server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # Bind, so server informs operating system that it's going to use given IP and port # For a server using 0.0.0.0 means to listen on all available interfaces, useful to connect locally to 127.0.0.1 and remotely to LAN interface IP server_socket.bind((IP, PORT)) # This makes server listen to new connections server_socket.listen() # List of sockets for select.select() sockets_list = [server_socket] # List of connected clients - socket as a key, user header and name as data clients = {} print(f'Listening for connections on {IP}:{PORT}...') # Handles message receiving def receive_message(client_socket): try: # Receive our "header" containing message length, it's size is defined and constant message_header = client_socket.recv(HEADER_LENGTH) # If we received no data, client gracefully closed a connection, for example using socket.close() or socket.shutdown(socket.SHUT_RDWR) if not len(message_header): return False # Convert header to int value message_length = int(message_header.decode('utf-8').strip()) # Return an object of message header and message data return {'header': message_header, 'data': client_socket.recv(message_length)} except: # If we are here, client closed connection violently, for example by pressing ctrl+c on his script # or just lost his connection # socket.close() also invokes socket.shutdown(socket.SHUT_RDWR) what sends information about closing the socket (shutdown read/write) # and that's also a cause when we receive an empty message return False while True: # Calls Unix select() system call or Windows select() WinSock call with three parameters: # - rlist - sockets to be monitored for incoming data # - wlist - sockets for data to be send to (checks if for example buffers are not full and socket is ready to send some data) # - xlist - sockets to be monitored for exceptions (we want to monitor all sockets for errors, so we can use rlist) # Returns lists: # - reading - sockets we received some data on (that way we don't have to check sockets manually) # - writing - sockets ready for data to be send thru them # - errors - sockets with some exceptions # This is a blocking call, code execution will "wait" here and "get" notified in case any action should be taken read_sockets, _, exception_sockets = select.select(sockets_list, [], sockets_list) # Iterate over notified sockets for notified_socket in read_sockets: # If notified socket is a server socket - new connection, accept it if notified_socket == server_socket: # Accept new connection # That gives us new socket - client socket, connected to this given client only, it's unique for that client # The other returned object is ip/port set client_socket, client_address = server_socket.accept() # Client should send his name right away, receive it user = receive_message(client_socket) # If False - client disconnected before he sent his name if user is False: continue # Add accepted socket to select.select() list sockets_list.append(client_socket) # Also save username and username header clients[client_socket] = user print('Accepted new connection from {}:{}, username: {}'.format(*client_address, user['data'].decode('utf-8'))) # Else existing socket is sending a message else: # Receive message message = receive_message(notified_socket) # If False, client disconnected, cleanup if message is False: print('Closed connection from: {}'.format(clientsnotified_socket.decode('utf-8'))) # Remove from list for socket.socket() sockets_list.remove(notified_socket) # Remove from our list of users del clients[notified_socket] continue # Get user by notified socket, so we will know who sent the message user = clients[notified_socket] print(f'Received message from {user["data"].decode("utf-8")}: {message["data"].decode("utf-8")}') # Iterate over connected clients and broadcast message for client_socket in clients: # But don't sent it to sender if client_socket != notified_socket: # Send user and message (both with their headers) # We are reusing here message header sent by sender, and saved username header send by user when he connected client_socket.send(user['header'] + user['data'] + message['header'] + message['data']) # It's not really necessary to have this, but will handle some socket exceptions just in case for notified_socket in exception_sockets: # Remove from list for socket.socket() sockets_list.remove(notified_socket) # Remove from our list of users del clients[notified_socket]
socket_client.py
import socket import errno from threading import Thread HEADER_LENGTH = 10 client_socket = None # Connects to the server def connect(ip, port, my_username, error_callback): global client_socket # Create a socket # socket.AF_INET - address family, IPv4, some otehr possible are AF_INET6, AF_BLUETOOTH, AF_UNIX # socket.SOCK_STREAM - TCP, conection-based, socket.SOCK_DGRAM - UDP, connectionless, datagrams, socket.SOCK_RAW - raw IP packets client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # Connect to a given ip and port client_socket.connect((ip, port)) except Exception as e: # Connection error error_callback('Connection error: {}'.format(str(e))) return False # Prepare username and header and send them # We need to encode username to bytes, then count number of bytes and prepare header of fixed size, that we encode to bytes as well username = my_username.encode('utf-8') username_header = f"{len(username):<{HEADER_LENGTH}}".encode('utf-8') client_socket.send(username_header + username) return True # Sends a message to the server def send(message): # Encode message to bytes, prepare header and convert to bytes, like for username above, then send message = message.encode('utf-8') message_header = f"{len(message):<{HEADER_LENGTH}}".encode('utf-8') client_socket.send(message_header + message) # Starts listening function in a thread # incoming_message_callback - callback to be called when new message arrives # error_callback - callback to be called on error def start_listening(incoming_message_callback, error_callback): Thread(target=listen, args=(incoming_message_callback, error_callback), daemon=True).start() # Listens for incomming messages def listen(incoming_message_callback, error_callback): while True: try: # Now we want to loop over received messages (there might be more than one) and print them while True: # Receive our "header" containing username length, it's size is defined and constant username_header = client_socket.recv(HEADER_LENGTH) # If we received no data, server gracefully closed a connection, for example using socket.close() or socket.shutdown(socket.SHUT_RDWR) if not len(username_header): error_callback('Connection closed by the server') # Convert header to int value username_length = int(username_header.decode('utf-8').strip()) # Receive and decode username username = client_socket.recv(username_length).decode('utf-8') # Now do the same for message (as we received username, we received whole message, there's no need to check if it has any length) message_header = client_socket.recv(HEADER_LENGTH) message_length = int(message_header.decode('utf-8').strip()) message = client_socket.recv(message_length).decode('utf-8') # Print message incoming_message_callback(username, message) except Exception as e: # Any other exception - something happened, exit error_callback('Reading error: {}'.format(str(e)))
3.效果图
- 开启服务器端口(server.py)
- 运行main.py,运行了主界面
- 打开多个main.py,即开启多进程,多用户进入聊天室,开始聊天。
4.注意事项
- kivy默认对中文字体不支持,所以要自己安装font字体,流涨用的是宋体,该项目会上传至github,可以一起下载。
- 该项目不能直接用pyinstaller直接打包。并且打包成安卓的app要用Linux比较方便。window打包还不是很清楚。
参考
本文参考了如下教程 :
- 官方文档:kivy
-python教程网:Python Programming Tutorials - 国外某Youtuber的kivy教程:B站搬运
项目下载
- github-chatRoom
本作品采用 知识共享署名-相同方式共享 4.0 国际许可协议 进行许可。
评论已关闭