【游戏服务器与客户端Protobuf通信与排行榜实战】

管理员
# 游戏服务器与客户端Protobuf通信与排行榜实战 ## 目录 1. [项目概述](#1-项目概述) 2. [Protobuf基础](#2-protobuf基础) 3. [协议定义](#3-协议定义) 4. [游戏客户端实现](#4-游戏客户端实现) 5. [游戏服务器实现](#5-游戏服务器实现) 6. [排行榜系统实现](#6-排行榜系统实现) 7. [完整运行示例](#7-完整运行示例) 8. [扩展与优化](#8-扩展与优化) --- ## 1. 项目概述 ### 1.1 项目背景 本项目实现了一个完整的游戏服务器与客户端通信系统,使用Protobuf作为序列化协议,实现以下核心功能: - **通信框架**:基于TCP长连接的双向通信 - **协议设计**:使用Protobuf定义通信协议 - **排行榜系统**:实时更新的玩家分数排行榜 - **心跳机制**:保证连接的稳定性 - **重连机制**:客户端断线自动重连 ### 1.2 技术栈 ``` 技术栈: ├── 通信协议:Protobuf 3.x ├── 通信方式:TCP Socket ├── 序列化:Protobuf二进制序列化 ├── 客户端语言:Java ├── 服务器语言:Java └── 排行榜实现:TreeMap + Redis(可选) ``` ### 1.3 项目结构 ``` game-rank-system/ ├── proto/ # Protobuf协议定义 │ └── game.proto # 游戏协议定义 ├── client/ # 客户端代码 │ ├── GameClient.java # 客户端主类 │ ├── PacketEncoder.java # 编码器 │ ├── PacketDecoder.java # 解码器 │ └── ClientHandler.java # 客户端处理器 ├── server/ # 服务器代码 │ ├── GameServer.java # 服务器主类 │ ├── PacketEncoder.java # 编码器 │ ├── PacketDecoder.java # 解码器 │ ├── ServerHandler.java # 服务器处理器 │ └── RankManager.java # 排行榜管理器 └── model/ # 数据模型 ├── Player.java # 玩家模型 └── RankItem.java # 排行榜项 ``` --- ## 2. Protobuf基础 ### 2.1 什么是Protobuf Protocol Buffers(简称Protobuf)是Google开发的一种语言无关、平台无关、可扩展的序列化结构数据的方法。 **优点**: - 序列化后体积小(比JSON小3-10倍) - 序列化/反序列化速度快(比JSON快20-100倍) - 向后兼容性好 - 支持多种语言 - 自描述性强 **缺点**: - 二进制格式,不可读 - 需要预定义.proto文件 ### 2.2 Protobuf vs JSON ```java /** * Protobuf vs JSON 对比 */ public class ProtobufVsJson { public static void main(String[] args) { System.out.println("┌─────────────────────────────────────────────────────────────────┐"); System.out.println("│ 对比项 │ Protobuf │ JSON │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 序列化大小 │ 小(二进制) │ 大(文本) │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 序列化速度 │ 快 │ 慢 │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 可读性 │ 差(二进制) │ 好(文本) │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 跨语言 │ 支持 │ 支持 │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 版本兼容 │ 好 │ 一般 │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 使用场景 │ 游戏通信、RPC │ Web API、配置文件 │"); System.out.println("└─────────────────────────────────────────────────────────────────┘"); } } ``` --- ## 3. 协议定义 ### 3.1 Protobuf文件定义 ```protobuf // game.proto syntax = "proto3"; package game; // 选项设置 option java_package = "com.game.protocol"; option java_outer_classname = "GameProtocol"; // 消息类型枚举 enum MsgType { UNKNOWN = 0; // 未知消息 HEARTBEAT = 1; // 心跳消息 HEARTBEAT_ACK = 2; // 心跳响应 // 登录相关 LOGIN_REQ = 10; // 登录请求 LOGIN_RESP = 11; // 登录响应 // 排行榜相关 GET_RANK_REQ = 20; // 获取排行榜请求 GET_RANK_RESP = 21; // 获取排行榜响应 UPDATE_SCORE_REQ = 22; // 更新分数请求 UPDATE_SCORE_RESP = 23; // 更新分数响应 RANK_CHANGE_NOTIFY = 24; // 排行榜变化通知 } // 心跳消息 message HeartBeat { int64 timestamp = 1; } // 登录请求 message LoginReq { string player_id = 1; string player_name = 2; int32 level = 3; } // 登录响应 message LoginResp { int32 code = 1; string message = 2; int64 player_id = 3; } // 排行榜项 message RankItem { int32 rank = 1; // 排名 string player_id = 2; // 玩家ID string player_name = 3; // 玩家名称 int64 score = 4; // 分数 int32 level = 5; // 等级 } // 获取排行榜请求 message GetRankReq { int32 top_n = 1; // 获取前N名 } // 获取排行榜响应 message GetRankResp { int32 code = 1; string message = 2; repeated RankItem rank_list = 3; // 排行榜列表 int32 my_rank = 4; // 我的排名 } // 更新分数请求 message UpdateScoreReq { int64 score = 1; // 新分数 string reason = 2; // 更新原因 } // 更新分数响应 message UpdateScoreResp { int32 code = 1; string message = 2; int32 my_rank = 3; // 更新后的排名 int64 new_score = 4; // 新分数 } // 排行榜变化通知 message RankChangeNotify { repeated RankItem top_ranks = 1; // 前N名变化 } // 游戏消息包装 message GameMessage { MsgType msg_type = 1; // 消息类型 bytes data = 2; // 消息数据(具体消息的序列化结果) } ``` ### 3.2 编译Protobuf文件 ```bash # 编译Proto文件 protoc --java_out=./src/main/java ./proto/game.proto # 编译后生成的文件结构: # com/game/protocol/GameProtocol.java ``` ### 3.3 生成的Java类使用 ```java /** * Protobuf生成的Java类使用示例 */ public class ProtobufUsage { public void example() { // 创建登录请求 GameProtocol.LoginReq loginReq = GameProtocol.LoginReq.newBuilder() .setPlayerId("player_001") .setPlayerName("玩家001") .setLevel(10) .build(); // 序列化为字节数组 byte[] data = loginReq.toByteArray(); // 反序列化 try { GameProtocol.LoginReq parsed = GameProtocol.LoginReq.parseFrom(data); System.out.println("Player ID: " + parsed.getPlayerId()); System.out.println("Player Name: " + parsed.getPlayerName()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } } } ``` --- ## 4. 游戏客户端实现 ### 4.1 客户端主类 ```java package com.game.client; import com.game.protocol.GameProtocol; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import java.util.Scanner; import java.util.concurrent.TimeUnit; /** * 游戏客户端 */ public class GameClient { private static final String SERVER_HOST = "127.0.0.1"; private static final int SERVER_PORT = 8080; private EventLoopGroup group; private Channel channel; private String playerId; private String playerName; public static void main(String[] args) { GameClient client = new GameClient(); client.start(); } /** * 启动客户端 */ public void start() { group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true) .handler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 编码器 pipeline.addLast("encoder", new PacketEncoder()); // 解码器 pipeline.addLast("decoder", new PacketDecoder()); // 业务处理器 pipeline.addLast("handler", new ClientHandler(GameClient.this)); } }); // 连接服务器 ChannelFuture future = bootstrap.connect(SERVER_HOST, SERVER_PORT).sync(); channel = future.channel(); System.out.println("========================================"); System.out.println(" 游戏客户端已启动"); System.out.println(" 服务器: " + SERVER_HOST + ":" + SERVER_PORT); System.out.println("========================================"); // 启动心跳线程 startHeartbeat(); // 等待连接建立 Thread.sleep(1000); // 处理用户输入 handleUserInput(); // 等待连接关闭 channel.closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { shutdown(); } } /** * 处理用户输入 */ private void handleUserInput() { Scanner scanner = new Scanner(System.in); while (true) { System.out.println("\n========================================"); System.out.println("请选择操作:"); System.out.println("1. 登录"); System.out.println("2. 获取排行榜"); System.out.println("3. 更新分数"); System.out.println("4. 退出"); System.out.println("========================================"); System.out.print("请输入选项: "); String choice = scanner.nextLine(); switch (choice) { case "1": login(scanner); break; case "2": getRank(scanner); break; case "3": updateScore(scanner); break; case "4": System.out.println("正在退出..."); shutdown(); return; default: System.out.println("无效的选项!"); } } } /** * 登录 */ private void login(Scanner scanner) { System.out.print("请输入玩家ID: "); playerId = scanner.nextLine(); System.out.print("请输入玩家名称: "); playerName = scanner.nextLine(); System.out.print("请输入玩家等级: "); int level = Integer.parseInt(scanner.nextLine()); // 构建登录请求 GameProtocol.LoginReq loginReq = GameProtocol.LoginReq.newBuilder() .setPlayerId(playerId) .setPlayerName(playerName) .setLevel(level) .build(); // 发送登录请求 sendMsg(GameProtocol.MsgType.LOGIN_REQ, loginReq); System.out.println("发送登录请求..."); } /** * 获取排行榜 */ private void getRank(Scanner scanner) { if (playerId == null) { System.out.println("请先登录!"); return; } System.out.print("请输入获取前N名: "); int topN = Integer.parseInt(scanner.nextLine()); // 构建获取排行榜请求 GameProtocol.GetRankReq getRankReq = GameProtocol.GetRankReq.newBuilder() .setTopN(topN) .build(); // 发送获取排行榜请求 sendMsg(GameProtocol.MsgType.GET_RANK_REQ, getRankReq); System.out.println("发送获取排行榜请求..."); } /** * 更新分数 */ private void updateScore(Scanner scanner) { if (playerId == null) { System.out.println("请先登录!"); return; } System.out.print("请输入新分数: "); long score = Long.parseLong(scanner.nextLine()); System.out.print("请输入更新原因: "); String reason = scanner.nextLine(); // 构建更新分数请求 GameProtocol.UpdateScoreReq updateScoreReq = GameProtocol.UpdateScoreReq.newBuilder() .setScore(score) .setReason(reason) .build(); // 发送更新分数请求 sendMsg(GameProtocol.MsgType.UPDATE_SCORE_REQ, updateScoreReq); System.out.println("发送更新分数请求..."); } /** * 发送消息 */ public void sendMsg(GameProtocol.MsgType msgType, com.google.protobuf.GeneratedMessageV3 message) { if (channel == null || !channel.isActive()) { System.out.println("连接未建立或已断开!"); return; } // 构建游戏消息 GameProtocol.GameMessage gameMsg = GameProtocol.GameMessage.newBuilder() .setMsgType(msgType) .setData(message.toByteArray()) .build(); // 发送消息 channel.writeAndFlush(gameMsg); } /** * 启动心跳 */ private void startHeartbeat() { channel.eventLoop().scheduleAtFixedRate(() -> { if (channel != null && channel.isActive()) { // 发送心跳 GameProtocol.HeartBeat heartBeat = GameProtocol.HeartBeat.newBuilder() .setTimestamp(System.currentTimeMillis()) .build(); sendMsg(GameProtocol.MsgType.HEARTBEAT, heartBeat); } }, 10, 10, TimeUnit.SECONDS); } /** * 关闭客户端 */ public void shutdown() { if (channel != null) { channel.close(); } if (group != null) { group.shutdownGracefully(); } System.exit(0); } public void setPlayerId(String playerId) { this.playerId = playerId; } public void setPlayerName(String playerName) { this.playerName = playerName; } } ``` ### 4.2 客户端编码器 ```java package com.game.client; import com.game.protocol.GameProtocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * 客户端编码器 * 将Protobuf消息编码为字节流 */ public class PacketEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, GameProtocol.GameMessage msg, ByteBuf out) throws Exception { // 获取消息的字节数组 byte[] data = msg.toByteArray(); // 写入数据长度(4字节) out.writeInt(data.length); // 写入数据 out.writeBytes(data); } } ``` ### 4.3 客户端解码器 ```java package com.game.client; import com.game.protocol.GameProtocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 客户端解码器 * 将字节流解码为Protobuf消息 */ public class PacketDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { // 检查是否有足够的数据读取长度(4字节) if (in.readableBytes() < 4) { return; } // 标记读取位置 in.markReaderIndex(); // 读取数据长度 int dataLength = in.readInt(); // 检查是否有足够的数据读取消息体 if (in.readableBytes() < dataLength) { // 数据不够,重置读取位置,等待更多数据 in.resetReaderIndex(); return; } // 读取消息体 byte[] data = new byte[dataLength]; in.readBytes(data); // 解析为GameMessage GameProtocol.GameMessage gameMsg = GameProtocol.GameMessage.parseFrom(data); // 添加到输出列表 out.add(gameMsg); } } ``` ### 4.4 客户端处理器 ```java package com.game.client; import com.game.protocol.GameProtocol; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; /** * 客户端处理器 * 处理服务器发送的消息 */ public class ClientHandler extends SimpleChannelInboundHandler { private GameClient client; public ClientHandler(GameClient client) { this.client = client; } @Override protected void channelRead0(ChannelHandlerContext ctx, GameProtocol.GameMessage msg) throws Exception { GameProtocol.MsgType msgType = msg.getMsgType(); switch (msgType) { case HEARTBEAT_ACK: handleHeartbeatAck(msg); break; case LOGIN_RESP: handleLoginResp(msg); break; case GET_RANK_RESP: handleGetRankResp(msg); break; case UPDATE_SCORE_RESP: handleUpdateScoreResp(msg); break; case RANK_CHANGE_NOTIFY: handleRankChangeNotify(msg); break; default: System.out.println("收到未知消息类型: " + msgType); } } /** * 处理心跳响应 */ private void handleHeartbeatAck(GameProtocol.GameMessage msg) { GameProtocol.HeartBeat heartBeat = GameProtocol.HeartBeat.parseFrom(msg.getData()); System.out.println("收到心跳响应: " + heartBeat.getTimestamp()); } /** * 处理登录响应 */ private void handleLoginResp(GameProtocol.GameMessage msg) { GameProtocol.LoginResp loginResp = GameProtocol.LoginResp.parseFrom(msg.getData()); System.out.println("\n========================================"); System.out.println("登录响应:"); System.out.println(" 响应码: " + loginResp.getCode()); System.out.println(" 消息: " + loginResp.getMessage()); if (loginResp.getCode() == 0) { System.out.println(" 玩家ID: " + loginResp.getPlayerId()); System.out.println(" 登录成功!"); } System.out.println("========================================"); } /** * 处理获取排行榜响应 */ private void handleGetRankResp(GameProtocol.GameMessage msg) { GameProtocol.GetRankResp getRankResp = GameProtocol.GetRankResp.parseFrom(msg.getData()); System.out.println("\n========================================"); System.out.println("排行榜响应:"); System.out.println(" 响应码: " + getRankResp.getCode()); System.out.println(" 消息: " + getRankResp.getMessage()); System.out.println(" 我的排名: " + getRankResp.getMyRank()); System.out.println("\n排行榜:"); System.out.println(" 排名\t玩家ID\t\t玩家名称\t分数\t等级"); System.out.println(" " + "─".repeat(50)); for (GameProtocol.RankItem item : getRankResp.getRankListList()) { System.out.printf(" %d\t%s\t%s\t%d\t%d\n", item.getRank(), item.getPlayerId(), item.getPlayerName(), item.getScore(), item.getLevel()); } System.out.println("========================================"); } /** * 处理更新分数响应 */ private void handleUpdateScoreResp(GameProtocol.GameMessage msg) { GameProtocol.UpdateScoreResp updateScoreResp = GameProtocol.UpdateScoreResp.parseFrom(msg.getData()); System.out.println("\n========================================"); System.out.println("更新分数响应:"); System.out.println(" 响应码: " + updateScoreResp.getCode()); System.out.println(" 消息: " + updateScoreResp.getMessage()); System.out.println(" 我的排名: " + updateScoreResp.getMyRank()); System.out.println(" 新分数: " + updateScoreResp.getNewScore()); System.out.println("========================================"); } /** * 处理排行榜变化通知 */ private void handleRankChangeNotify(GameProtocol.GameMessage msg) { GameProtocol.RankChangeNotify notify = GameProtocol.RankChangeNotify.parseFrom(msg.getData()); System.out.println("\n========================================"); System.out.println("排行榜变化通知:"); for (GameProtocol.RankItem item : notify.getTopRanksList()) { System.out.printf(" 排名:%d, 玩家:%s, 分数:%d\n", item.getRank(), item.getPlayerName(), item.getScore()); } System.out.println("========================================"); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接到服务器: " + ctx.channel().remoteAddress()); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("与服务器断开连接: " + ctx.channel().remoteAddress()); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("发生异常: " + cause.getMessage()); cause.printStackTrace(); ctx.close(); } } ``` --- ## 5. 游戏服务器实现 ### 5.1 服务器主类 ```java package com.game.server; import com.game.protocol.GameProtocol; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.util.concurrent.TimeUnit; /** * 游戏服务器 */ public class GameServer { private static final int SERVER_PORT = 8080; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private RankManager rankManager; public static void main(String[] args) { GameServer server = new GameServer(); server.start(); } /** * 启动服务器 */ public void start() { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); // 创建排行榜管理器 rankManager = new RankManager(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); // 编码器 pipeline.addLast("encoder", new PacketEncoder()); // 解码器 pipeline.addLast("decoder", new PacketDecoder()); // 业务处理器 pipeline.addLast("handler", new ServerHandler(GameServer.this, rankManager)); } }); // 绑定端口 ChannelFuture future = bootstrap.bind(SERVER_PORT).sync(); System.out.println("========================================"); System.out.println(" 游戏服务器已启动"); System.out.println(" 监听端口: " + SERVER_PORT); System.out.println("========================================"); // 启动定时任务:排行榜变化通知 startRankNotifyTask(); // 等待服务器关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { shutdown(); } } /** * 启动排行榜变化通知任务 */ private void startRankNotifyTask() { workerGroup.scheduleAtFixedRate(() -> { rankManager.broadcastTopRanks(); }, 30, 30, TimeUnit.SECONDS); } /** * 关闭服务器 */ public void shutdown() { if (bossGroup != null) { bossGroup.shutdownGracefully(); } if (workerGroup != null) { workerGroup.shutdownGracefully(); } } } ``` ### 5.2 服务器编码器 ```java package com.game.server; import com.game.protocol.GameProtocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * 服务器编码器 */ public class PacketEncoder extends MessageToByteEncoder { @Override protected void encode(ChannelHandlerContext ctx, GameProtocol.GameMessage msg, ByteBuf out) throws Exception { // 获取消息的字节数组 byte[] data = msg.toByteArray(); // 写入数据长度(4字节) out.writeInt(data.length); // 写入数据 out.writeBytes(data); } } ``` ### 5.3 服务器解码器 ```java package com.game.server; import com.game.protocol.GameProtocol; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import java.util.List; /** * 服务器解码器 */ public class PacketDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { // 检查是否有足够的数据读取长度(4字节) if (in.readableBytes() < 4) { return; } // 标记读取位置 in.markReaderIndex(); // 读取数据长度 int dataLength = in.readInt(); // 检查是否有足够的数据读取消息体 if (in.readableBytes() < dataLength) { // 数据不够,重置读取位置,等待更多数据 in.resetReaderIndex(); return; } // 读取消息体 byte[] data = new byte[dataLength]; in.readBytes(data); // 解析为GameMessage GameProtocol.GameMessage gameMsg = GameProtocol.GameMessage.parseFrom(data); // 添加到输出列表 out.add(gameMsg); } } ``` ### 5.4 服务器处理器 ```java package com.game.server; import com.game.protocol.GameProtocol; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.concurrent.ConcurrentHashMap; /** * 服务器处理器 */ public class ServerHandler extends SimpleChannelInboundHandler { private GameServer server; private RankManager rankManager; // 管理所有在线客户端 private static final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 玩家ID与Channel的映射 private static final ConcurrentHashMap playerChannels = new ConcurrentHashMap<>(); public ServerHandler(GameServer server, RankManager rankManager) { this.server = server; this.rankManager = rankManager; } @Override protected void channelRead0(ChannelHandlerContext ctx, GameProtocol.GameMessage msg) throws Exception { GameProtocol.MsgType msgType = msg.getMsgType(); switch (msgType) { case HEARTBEAT: handleHeartbeat(ctx, msg); break; case LOGIN_REQ: handleLoginReq(ctx, msg); break; case GET_RANK_REQ: handleGetRankReq(ctx, msg); break; case UPDATE_SCORE_REQ: handleUpdateScoreReq(ctx, msg); break; default: System.out.println("收到未知消息类型: " + msgType); } } /** * 处理心跳 */ private void handleHeartbeat(ChannelHandlerContext ctx, GameProtocol.GameMessage msg) { GameProtocol.HeartBeat heartBeat = GameProtocol.HeartBeat.parseFrom(msg.getData()); // 响应心跳 GameProtocol.HeartBeat heartbeatAck = GameProtocol.HeartBeat.newBuilder() .setTimestamp(heartBeat.getTimestamp()) .build(); sendMsg(ctx, GameProtocol.MsgType.HEARTBEAT_ACK, heartbeatAck); } /** * 处理登录请求 */ private void handleLoginReq(ChannelHandlerContext ctx, GameProtocol.GameMessage msg) { GameProtocol.LoginReq loginReq = GameProtocol.LoginReq.parseFrom(msg.getData()); System.out.println("玩家登录: " + loginReq.getPlayerId() + ", " + loginReq.getPlayerName()); // 保存玩家ID与Channel的映射 playerChannels.put(loginReq.getPlayerId(), ctx.channel()); // 响应登录 GameProtocol.LoginResp loginResp = GameProtocol.LoginResp.newBuilder() .setCode(0) .setMessage("登录成功") .setPlayerId(Long.parseLong(loginReq.getPlayerId().replace("player_", ""))) .build(); sendMsg(ctx, GameProtocol.MsgType.LOGIN_RESP, loginResp); } /** * 处理获取排行榜请求 */ private void handleGetRankReq(ChannelHandlerContext ctx, GameProtocol.GameMessage msg) { GameProtocol.GetRankReq getRankReq = GameProtocol.GetRankReq.parseFrom(msg.getData()); // 获取排行榜 String playerId = getPlayerIdByChannel(ctx.channel()); java.util.List rankList = rankManager.getTopRanks(getRankReq.getTopN()); int myRank = rankManager.getPlayerRank(playerId); // 构建排行榜项列表 java.util.List rankItemList = new java.util.ArrayList<>(); for (RankItem item : rankList) { GameProtocol.RankItem rankItem = GameProtocol.RankItem.newBuilder() .setRank(item.getRank()) .setPlayerId(item.getPlayerId()) .setPlayerName(item.getPlayerName()) .setScore(item.getScore()) .setLevel(item.getLevel()) .build(); rankItemList.add(rankItem); } // 响应获取排行榜 GameProtocol.GetRankResp getRankResp = GameProtocol.GetRankResp.newBuilder() .setCode(0) .setMessage("获取排行榜成功") .addAllRankList(rankItemList) .setMyRank(myRank) .build(); sendMsg(ctx, GameProtocol.MsgType.GET_RANK_RESP, getRankResp); } /** * 处理更新分数请求 */ private void handleUpdateScoreReq(ChannelHandlerContext ctx, GameProtocol.GameMessage msg) { GameProtocol.UpdateScoreReq updateScoreReq = GameProtocol.UpdateScoreReq.parseFrom(msg.getData()); // 更新分数 String playerId = getPlayerIdByChannel(ctx.channel()); String playerName = getPlayerNameByChannel(ctx.channel()); int newRank = rankManager.updateScore(playerId, playerName, updateScoreReq.getScore()); // 响应更新分数 GameProtocol.UpdateScoreResp updateScoreResp = GameProtocol.UpdateScoreResp.newBuilder() .setCode(0) .setMessage("更新分数成功") .setMyRank(newRank) .setNewScore(updateScoreReq.getScore()) .build(); sendMsg(ctx, GameProtocol.MsgType.UPDATE_SCORE_RESP, updateScoreResp); } /** * 发送消息 */ private void sendMsg(ChannelHandlerContext ctx, GameProtocol.MsgType msgType, com.google.protobuf.GeneratedMessageV3 message) { GameProtocol.GameMessage gameMsg = GameProtocol.GameMessage.newBuilder() .setMsgType(msgType) .setData(message.toByteArray()) .build(); ctx.writeAndFlush(gameMsg); } /** * 广播消息 */ public static void broadcast(GameProtocol.GameMessage msg) { channels.writeAndFlush(msg); } /** * 根据Channel获取玩家ID */ private String getPlayerIdByChannel(Channel channel) { for (java.util.Map.Entry entry : playerChannels.entrySet()) { if (entry.getValue() == channel) { return entry.getKey(); } } return null; } /** * 根据Channel获取玩家名称 */ private String getPlayerNameByChannel(Channel channel) { // 简化实现,实际应该从玩家信息中获取 return "玩家"; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { channels.add(ctx.channel()); System.out.println("客户端连接: " + ctx.channel().remoteAddress()); super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { channels.remove(ctx.channel()); // 从playerChannels中移除 String playerId = getPlayerIdByChannel(ctx.channel()); if (playerId != null) { playerChannels.remove(playerId); } System.out.println("客户端断开: " + ctx.channel().remoteAddress()); super.channelInactive(ctx); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.err.println("发生异常: " + cause.getMessage()); cause.printStackTrace(); ctx.close(); } } ``` --- ## 6. 排行榜系统实现 ### 6.1 排行榜管理器 ```java package com.game.server; import com.game.protocol.GameProtocol; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.*; import java.util.concurrent.ConcurrentSkipListMap; /** * 排行榜管理器 * 使用TreeMap实现高效的排行榜 */ public class RankManager { // 使用TreeMap实现排行榜,按分数降序排列 // Key: 分数(取负值以实现降序) // Value: 玩家信息列表 private TreeMap> rankTree; // 玩家ID与分数的映射 private Map playerScores; // 玩家ID与玩家信息的映射 private Map playerInfo; public RankManager() { // 使用比较器实现降序 rankTree = new TreeMap<>(Collections.reverseOrder()); playerScores = new ConcurrentHashMap<>(); playerInfo = new ConcurrentHashMap<>(); // 初始化一些测试数据 initTestData(); } /** * 初始化测试数据 */ private void initTestData() { // 添加一些测试玩家 updateScore("player_test1", "测试玩家1", 10000); updateScore("player_test2", "测试玩家2", 8500); updateScore("player_test3", "测试玩家3", 9200); updateScore("player_test4", "测试玩家4", 7800); updateScore("player_test5", "测试玩家5", 9500); } /** * 更新玩家分数 * @param playerId 玩家ID * @param playerName 玩家名称 * @param score 新分数 * @return 更新后的排名 */ public synchronized int updateScore(String playerId, String playerName, long score) { // 如果玩家之前有分数,先移除 if (playerScores.containsKey(playerId)) { long oldScore = playerScores.get(playerId); List oldList = rankTree.get(oldScore); if (oldList != null) { oldList.removeIf(item -> item.getPlayerId().equals(playerId)); if (oldList.isEmpty()) { rankTree.remove(oldScore); } } } // 更新玩家分数 playerScores.put(playerId, score); // 创建排行榜项 RankItem rankItem = new RankItem(); rankItem.setPlayerId(playerId); rankItem.setPlayerName(playerName); rankItem.setScore(score); rankItem.setLevel(1); // 添加到排行榜 rankTree.computeIfAbsent(score, k -> new ArrayList<>()).add(rankItem); playerInfo.put(playerId, rankItem); // 重新计算排名 recalculateRanks(); // 返回玩家排名 return getPlayerRank(playerId); } /** * 重新计算所有玩家的排名 */ private synchronized void recalculateRanks() { int rank = 1; for (List items : rankTree.values()) { for (RankItem item : items) { item.setRank(rank++); } } } /** * 获取前N名玩家 * @param topN 前N名 * @return 排行榜列表 */ public synchronized List getTopRanks(int topN) { List result = new ArrayList<>(); int count = 0; for (List items : rankTree.values()) { for (RankItem item : items) { if (count >= topN) { return result; } result.add(item); count++; } } return result; } /** * 获取玩家排名 * @param playerId 玩家ID * @return 排名(如果玩家不在排行榜中,返回-1) */ public synchronized int getPlayerRank(String playerId) { RankItem item = playerInfo.get(playerId); return item != null ? item.getRank() : -1; } /** * 获取玩家分数 * @param playerId 玩家ID * @return 分数(如果玩家不在排行榜中,返回-1) */ public synchronized long getPlayerScore(String playerId) { return playerScores.getOrDefault(playerId, -1L); } /** * 广播排行榜变化通知 */ public void broadcastTopRanks() { List topRanks = getTopRanks(10); // 构建排行榜项列表 List rankItemList = new ArrayList<>(); for (RankItem item : topRanks) { GameProtocol.RankItem rankItem = GameProtocol.RankItem.newBuilder() .setRank(item.getRank()) .setPlayerId(item.getPlayerId()) .setPlayerName(item.getPlayerName()) .setScore(item.getScore()) .setLevel(item.getLevel()) .build(); rankItemList.add(rankItem); } // 构建排行榜变化通知 GameProtocol.RankChangeNotify notify = GameProtocol.RankChangeNotify.newBuilder() .addAllTopRanks(rankItemList) .build(); // 构建游戏消息 GameProtocol.GameMessage gameMsg = GameProtocol.GameMessage.newBuilder() .setMsgType(GameProtocol.MsgType.RANK_CHANGE_NOTIFY) .setData(notify.toByteArray()) .build(); // 广播给所有客户端 ServerHandler.broadcast(gameMsg); System.out.println("广播排行榜变化,前10名已更新"); } /** * 获取排行榜总人数 * @return 总人数 */ public synchronized int getTotalPlayers() { return playerScores.size(); } /** * 清空排行榜 */ public synchronized void clear() { rankTree.clear(); playerScores.clear(); playerInfo.clear(); } } ``` ### 6.2 排行榜数据模型 ```java package com.game.server; /** * 排行榜项 */ public class RankItem { private int rank; // 排名 private String playerId; // 玩家ID private String playerName; // 玩家名称 private long score; // 分数 private int level; // 等级 public RankItem() { } public RankItem(String playerId, String playerName, long score, int level) { this.playerId = playerId; this.playerName = playerName; this.score = score; this.level = level; } // Getter和Setter方法 public int getRank() { return rank; } public void setRank(int rank) { this.rank = rank; } public String getPlayerId() { return playerId; } public void setPlayerId(String playerId) { this.playerId = playerId; } public String getPlayerName() { return playerName; } public void setPlayerName(String playerName) { this.playerName = playerName; } public long getScore() { return score; } public void setScore(long score) { this.score = score; } public int getLevel() { return level; } public void setLevel(int level) { this.level = level; } @Override public String toString() { return String.format("RankItem{rank=%d, playerId='%s', playerName='%s', score=%d, level=%d}", rank, playerId, playerName, score, level); } } ``` --- ## 7. 完整运行示例 ### 7.1 项目依赖 ```xml 4.0.0 com.game game-rank-system 1.0.0 UTF-8 1.8 1.8 4.1.68.Final 3.17.3 io.netty netty-all ${netty.version} com.google.protobuf protobuf-java ${protobuf.version} org.slf4j slf4j-api 1.7.32 ch.qos.logback logback-classic 1.2.6 org.xolstice.maven.plugins protobuf-maven-plugin 0.6.1 com.google.protobuf:protoc:3.17.3:exe:${os.detected.classifier} grpc-java compile compile-custom org.apache.maven.plugins maven-compiler-plugin 3.8.1 1.8 1.8 kr.motd.maven os-maven-plugin 1.7.0 ``` ### 7.2 运行步骤 ```bash # 1. 编译Protobuf文件 protoc --java_out=./src/main/java ./proto/game.proto # 2. 编译项目 mvn clean compile # 3. 启动游戏服务器 mvn exec:java -Dexec.mainClass="com.game.server.GameServer" # 4. 启动游戏客户端(在另一个终端) mvn exec:java -Dexec.mainClass="com.game.client.GameClient" ``` ### 7.3 客户端操作示例 ``` ======================================== 游戏客户端已启动 服务器: 127.0.0.1:8080 ======================================== 连接到服务器: /127.0.0.1:8080 ======================================== 请选择操作: 1. 登录 2. 获取排行榜 3. 更新分数 4. 退出 ======================================== 请输入选项: 1 请输入玩家ID: player_001 请输入玩家名称: 玩家001 请输入玩家等级: 10 发送登录请求... ======================================== 登录响应: 响应码: 0 消息: 登录成功 玩家ID: 1 登录成功! ======================================== ======================================== 请选择操作: 1. 登录 2. 获取排行榜 3. 更新分数 4. 退出 ======================================== 请输入选项: 3 请输入新分数: 9999 请输入更新原因: 完成关卡 发送更新分数请求... ======================================== 更新分数响应: 响应码: 0 消息: 更新分数成功 我的排名: 3 新分数: 9999 ======================================== ======================================== 请选择操作: 1. 登录 2. 获取排行榜 3. 更新分数 4. 退出 ======================================== 请输入选项: 2 请输入获取前N名: 10 发送获取排行榜请求... ======================================== 排行榜响应: 响应码: 0 消息: 获取排行榜成功 我的排名: 3 排行榜: 排名 玩家ID 玩家名称 分数 等级 ─────────────────────────────────────────────────── 1 player_test1 测试玩家1 10000 1 2 player_test5 测试玩家5 9500 1 3 player_001 玩家001 9999 1 4 player_test3 测试玩家3 9200 1 5 player_test2 测试玩家2 8500 1 6 player_test4 测试玩家4 7800 1 ======================================== ``` ### 7.4 服务器日志示例 ``` ======================================== 游戏服务器已启动 监听端口: 8080 ======================================== 客户端连接: /127.0.0.1:52056 玩家登录: player_001, 玩家001 广播排行榜变化,前10名已更新 ``` --- ## 8. 扩展与优化 ### 8.1 使用Redis实现排行榜 ```java package com.game.server; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; import redis.clients.jedis.Tuple; import java.util.ArrayList; import java.util.List; import java.util.Set; /** * 基于Redis的排行榜实现 */ public class RedisRankManager { private JedisPool jedisPool; private static final String RANK_KEY = "game:rank"; public RedisRankManager(String host, int port) { this.jedisPool = new JedisPool(host, port); } /** * 更新玩家分数 */ public void updateScore(String playerId, String playerName, long score) { try (Jedis jedis = jedisPool.getResource()) { // 使用ZSET存储排行榜 jedis.zadd(RANK_KEY, score, playerId); // 存储玩家名称 jedis.hset("game:player:info", playerId, playerName); } } /** * 获取前N名玩家 */ public List getTopRanks(int topN) { try (Jedis jedis = jedisPool.getResource()) { // 获取前N名(降序) Set tuples = jedis.zrevrangeWithScores(RANK_KEY, 0, topN - 1); List result = new ArrayList<>(); int rank = 1; for (Tuple tuple : tuples) { String playerId = tuple.getElement(); double score = tuple.getScore(); String playerName = jedis.hget("game:player:info", playerId); RankItem item = new RankItem(playerId, playerName, (long) score, 1); item.setRank(rank++); result.add(item); } return result; } } /** * 获取玩家排名 */ public int getPlayerRank(String playerId) { try (Jedis jedis = jedisPool.getResource()) { Long rank = jedis.zrevrank(RANK_KEY, playerId); return rank != null ? rank.intValue() + 1 : -1; } } /** * 获取玩家分数 */ public long getPlayerScore(String playerId) { try (Jedis jedis = jedisPool.getResource()) { Double score = jedis.zscore(RANK_KEY, playerId); return score != null ? score.longValue() : -1L; } } /** * 获取排行榜总人数 */ public int getTotalPlayers() { try (Jedis jedis = jedisPool.getResource()) { Long count = jedis.zcard(RANK_KEY); return count != null ? count.intValue() : 0; } } /** * 清空排行榜 */ public void clear() { try (Jedis jedis = jedisPool.getResource()) { jedis.del(RANK_KEY); jedis.del("game:player:info"); } } /** * 关闭连接池 */ public void close() { jedisPool.close(); } } ``` ### 8.2 添加多服务器支持 ```java package com.game.server; import com.game.protocol.GameProtocol; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * 支持多服务器的游戏服务器 */ public class MultiServerGameServer { private static final int[] SERVER_PORTS = {8080, 8081, 8082}; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private RankManager rankManager; public void start() { bossGroup = new NioEventLoopGroup(SERVER_PORTS.length); workerGroup = new NioEventLoopGroup(); rankManager = new RankManager(); for (int port : SERVER_PORTS) { startServer(port); } System.out.println("========================================"); System.out.println(" 游戏服务器群已启动"); System.out.println(" 监听端口: " + java.util.Arrays.toString(SERVER_PORTS)); System.out.println("========================================"); } private void startServer(int port) { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("encoder", new PacketEncoder()); pipeline.addLast("decoder", new PacketDecoder()); pipeline.addLast("handler", new ServerHandler(MultiServerGameServer.this, rankManager)); } }); try { bootstrap.bind(port).sync(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` ### 8.3 添加消息加密 ```java package com.game.server; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import javax.crypto.Cipher; import javax.crypto.spec.SecretKeySpec; import java.util.List; /** * 消息加密处理器 */ public class MessageEncryptor extends MessageToMessageEncoder { private static final String SECRET_KEY = "mySecretKey12345"; // 16字节密钥 private static final String ALGORITHM = "AES"; private Cipher encryptCipher; public MessageEncryptor() { try { SecretKeySpec keySpec = new SecretKeySpec(SECRET_KEY.getBytes(), ALGORITHM); encryptCipher = Cipher.getInstance(ALGORITHM); encryptCipher.init(Cipher.ENCRYPT_MODE, keySpec); } catch (Exception e) { e.printStackTrace(); } } @Override protected void encode(ChannelHandlerContext ctx, ByteBuf msg, List out) throws Exception { byte[] data = new byte[msg.readableBytes()]; msg.readBytes(data); // 加密数据 byte[] encryptedData = encryptCipher.doFinal(data); // 创建新的ByteBuf ByteBuf encryptedBuf = ctx.alloc().buffer(encryptedData.length); encryptedBuf.writeBytes(encryptedData); out.add(encryptedBuf); } } ``` ### 8.4 添加性能监控 ```java package com.game.server; import java.util.concurrent.atomic.AtomicLong; /** * 性能监控 */ public class PerformanceMonitor { private AtomicLong totalMessages = new AtomicLong(0); private AtomicLong totalBytes = new AtomicLong(0); private AtomicLong loginCount = new AtomicLong(0); private AtomicLong rankQueryCount = new AtomicLong(0); private AtomicLong scoreUpdateCount = new AtomicLong(0); public void incrementMessage(long bytes) { totalMessages.incrementAndGet(); totalBytes.addAndGet(bytes); } public void incrementLogin() { loginCount.incrementAndGet(); } public void incrementRankQuery() { rankQueryCount.incrementAndGet(); } public void incrementScoreUpdate() { scoreUpdateCount.incrementAndGet(); } public void printStatistics() { System.out.println("\n========================================"); System.out.println("性能统计:"); System.out.println(" 总消息数: " + totalMessages.get()); System.out.println(" 总字节数: " + totalBytes.get() + " bytes"); System.out.println(" 登录次数: " + loginCount.get()); System.out.println(" 排行榜查询次数: " + rankQueryCount.get()); System.out.println(" 分数更新次数: " + scoreUpdateCount.get()); System.out.println("========================================\n"); } } ``` ### 8.5 性能优化建议 ```java /** * 性能优化建议 */ public class PerformanceOptimization { public static void main(String[] args) { System.out.println("┌─────────────────────────────────────────────────────────────────┐"); System.out.println("│ 优化项 │ 优化方案 │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 排行榜性能 │ 使用Redis ZSET替代TreeMap │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 并发处理 │ 使用NIO + Netty │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 消息压缩 │ 使用GZIP压缩Protobuf数据 │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 缓存热点数据 │ 使用本地缓存排行榜Top100 │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 连接池 │ 使用对象池管理Channel和对象 │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 异步处理 │ 使用CompletableFuture异步处理 │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 分片处理 │ 按玩家ID分片处理排行榜 │"); System.out.println("├─────────────────────────────────────────────────────────────────┤"); System.out.println("│ 数据库优化 │ 批量更新、索引优化 │"); System.out.println("└─────────────────────────────────────────────────────────────────┘"); } } ``` --- ## 总结 本项目实现了一个完整的游戏服务器与客户端通信系统,使用Protobuf作为序列化协议,实现了实时排行榜功能。主要内容包括: ### 核心功能 1. **通信框架**:基于Netty的TCP长连接通信 2. **协议设计**:使用Protobuf定义了完整的游戏协议 3. **排行榜系统**:支持实时更新、查询排名、广播变化 4. **心跳机制**:保证连接稳定性 5. **多客户端支持**:支持多个客户端同时连接 ### 技术亮点 1. **Protobuf协议**:高效、轻量、跨平台 2. **Netty框架**:高性能、高并发 3. **TreeMap排行榜**:O(log n)的查询和更新性能 4. **消息编解码**:支持粘包和半包处理 5. **广播机制**:实时推送排行榜变化 ### 扩展方向 1. 使用Redis ZSET实现更大规模的排行榜 2. 添加消息加密保证安全性 3. 实现多服务器集群支持 4. 添加性能监控和日志系统 5. 实现玩家分组和好友系统 这个项目为游戏开发提供了一个完整的通信和排行榜解决方案,可以根据实际需求进行扩展和优化!
评论 0

发表评论 取消回复

Shift+Enter 换行  ·  Enter 发送
还没有评论,来发表第一条吧