大学做html个人网站素材淮安市汽车网站建设背景
2026/1/14 14:25:40 网站建设 项目流程
大学做html个人网站素材,淮安市汽车网站建设背景,wordpress发布图片插件,e福州怎么代缴医保RabbitMQ 客户端 连接、发送、接收处理消息 一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样 RabbitMQ 服务#xff0c;不是像其他服务器一样#xff0c;负责逻辑处理#xff0c;然后转发给客户端 而是所有客户端想要向 RabbitMQ服务发送消息#xff0c; 第一步…RabbitMQ 客户端 连接、发送、接收处理消息一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样RabbitMQ 服务不是像其他服务器一样负责逻辑处理然后转发给客户端而是所有客户端想要向 RabbitMQ服务发送消息第一步创建一个链接 RabbitMQ 服务的连接需要传入 RabbitMQ服务地址、用户名、密码然后在连接代码中传入一个 queue 的字符串作为 标志连接成功后RabbitMQ服务上就可以看到这个链接了如下图可以看到有一个 Name queueL1 的连接后边有链接状态、消息数Ready 和 Total 都是 0向 RabbitMQ 发送消息的(1) 如果没有建立连接执行第一步建立一个链接(2) 通过 发送消息接口向 RabbitMQ 服务 发消息(3) RabbitMQ 服务接收到消息只是按照连接的 queue 分别把消息放在自己名字的 queue 下 RabbitMQ 服务只是存着客户端发送的消息服务什么都不处理向 RabbitMQ 服务发送几条消息下图可以看到 queueL1 的队列已经接收了 5 条消息这五条消息如果没有客户端接收处理就一直在这存着接收 RabbitMQ 服务消息(1) 如果没有建立连接执行第一步建立一个链接(2) 注册接收消息接口在 RabbitMQ 中叫 消费消息可以标记消费消息后是否将 RabbitMQ 的数据删除(3) 如果 RabbitMQ 服务收到消息就转发给 注册接收消息接口的 连接如果接收的连接标记了 AutoDelete那么发送给客户端后RabbitMQ 就会将消息从消息队列中删除注册接收消息我的客户端就会收到 RabbitMQ 发送过来的消息消息中包含发送上来的消息内容还有发送消息的 queue 名字此时再看就会发现 Ready 和 Total 又变成 0 了为什么上面讲解中将 接收 RabbitMQ 服务消息、向 RabbitMQ 发送消息的 分开说是因为 RabbitMQ 发送消息就仅仅是发消息发送完就不管了而 RabbitMQ 的消费消息(接收消息) 也仅仅是接收消息它不管是谁发的消息只要是发送的 RabbitMQ 服务的消息它都能接收(3.1) 比如我创建了 一个 连接queue名为 xxxA它发送了消息 “Hello World”xxxA 连接自己又注册了 消费消息(接收消息)那么xxxA 自己就会接收到 xxxA 队列发送的 Hello World 信息(3.2) 我又创建了 新的连接queue 名还是 xxxA那么新的连接也可以收到 (3.1) 发的 消息 HelloWorld二. 客户端连接服务器实例化一个 连接 RabbitMQ 服务的客户端连接实例化需要传入 服务地址、端口、用户名、密码using RabbitMQ.Client;using System;using System.Threading.Tasks;using RabbitMQ.Client.Exceptions;using UnityEngine;using System.Text;using RabbitMQ.Client.Events;namespace Network{////// RabbitMQ 创建一个链接/// 供 RabbitMQReceive、RabbitMQSend 使用///public class RabbitMQConnect{private RabbitMQConnectData connectData;private ConnectionFactory factory; private IChannel channel; private IConnection connection; private NetWorkState state; private Actionstring, byte[] receivedCallBack; private const int TimeOut 10; //连接超时 10 秒 private bool dispose false; public RabbitMQConnect(RabbitMQConnectData connectData) { this.connectData connectData; State NetWorkState.Disconnected; dispose false; } public string Queue { get { return connectData.queue; } } public NetWorkState State { get { return state; } private set { state value; } } public IChannel Channel { get { return channel; } } /// summary /// 网络是否连接中 /// /summary public bool IsConnect { get { if (null channel || null connection) { return false; } return channel.IsOpen connection.IsOpen; } } public async Task StartConnect() { if (State NetWorkState.Connecting) { await Task.Delay(TimeOut * 1000); } if (State NetWorkState.Connected) { return; } // 创建连接工厂 // 如果初始化失败不会启动恢复连接 //factory new ConnectionFactory() //{ // HostName hostName, // 替换为你的 RabbitMQ 服务器地址 // UserName userName, // 替换为用户名 // Password password // 替换为密码 //}; string url $amqp://{connectData.userName}:{connectData.password}{connectData.hostName}:{connectData.port}; //string.Format(amqp://unity:unity139.9.137.14:5672); factory new ConnectionFactory() { Uri new Uri(url) }; // 自动恢复连接 factory.AutomaticRecoveryEnabled true; // 如果由于异常导致恢复失败例如RabbitMQ节点仍然不可达它将在固定的时间间隔默认为5秒后重试。间隔时间可配置如下 // Connection.CloseAsync 关闭的连接不会启动自动恢复连接 factory.NetworkRecoveryInterval TimeSpan.FromSeconds(10); factory.TopologyRecoveryEnabled true; while (State ! NetWorkState.Connected) { if (!dispose) { await Connect(); } } await Task.Delay(1); if (!string.IsNullOrEmpty(connectData.receiveQueeu)) { await BasicConsumer(); } } private async Task Connect() { try { State NetWorkState.Connecting; // 异步创建连接 connection await factory.CreateConnectionAsync(); channel await connection.CreateChannelAsync(); // 声明队列 QueueDeclareOk queueDeclareOk await channel.QueueDeclareAsync( queue: connectData.queue, durable: false, exclusive: false, autoDelete: false, arguments: null); /* autoDelete true没有消费者时队列自动删除通常用于临时或一次性的队列。 autoDelete false队列不会自动删除通常用于需要长期存在的队列。 选择是否设置 autoDelete true 取决于你是否希望队列在没有消费者时自动删除。如果你的队列是临时的、一次性的那么使用 autoDelete true 会更适合如果队列是长期需要使用的则设置为 autoDelete false 会更为合适 */ State NetWorkState.Connected; // 设置消费者的预取计数为10允许同时处理10条消息 await channel.BasicQosAsync( prefetchSize: 0, prefetchCount: 10, global: false); Debug.Log(RabbitMQ Connect Success); GameNotifycation.GetInstance().NotifyNetWorkState(ENUM_MSG_TYPE.MSG_NETWORK_STATE_CHANGE, State); } catch (BrokerUnreachableException e) { await Task.Delay(5000); State NetWorkState.ConnectFailed; Debug.LogError(ConnectError: e.ToString()); // apply retry logic } await Task.Delay(1); } /// summary /// 发送消息 /// exchange: 要发布消息的交换机名称。 /// routingKey: 路由键决定消息应该路由到哪个队列。 /// mandatory: 如果设置为 trueRabbitMQ 会确保消息至少被投递到一个队列。如果没有队列接收该消息RabbitMQ 会触发 basic.return。 /// immediate: 如果设置为 trueRabbitMQ 会在消息无法立即被消费时丢弃消息。 /// basicProperties: 消息的属性类型为 IBasicProperties。这些属性可以设置消息的优先级、持久性等。 /// body: 消息体的字节数组。 /// /// BasicPublishAsync 方法 没有返回消息投递的结果。它仅仅表示“请求已经被成功发送到 RabbitMQ 的交换机”。如果发布操作成功Task 会正常完成不会抛出异常。你可以通过异常处理来捕获潜在的错误。 /// /summary /// param namemsg/param public async Task SendAsync(string message) { if (!IsConnect) { UnityEngine.Debug.Log(Send not IsConnect); await StartConnect(); } try { IChannel channel Channel; var body Encoding.UTF8.GetBytes(message); var props new BasicProperties(); props.ContentType text/plain; props.DeliveryMode DeliveryModes.Transient; await channel.BasicPublishAsync( exchange: , routingKey: Queue, mandatory: false, basicProperties: props, body: body).ConfigureAwait(false); //Debug.Log($[x] Sent: Complete); } catch (Exception ex) { UnityEngine.Debug.LogError($Error publishing message: {ex.Message}); } } /// summary /// 设置接收消息回调 /// /summary /// param namereceivedCallBack/param public void SetReceive(Actionstring, byte[] receivedCallBack) { this.receivedCallBack receivedCallBack; } /// summary /// 创建异步消费者 /// /summary /// returns/returns public async Taskstring BasicConsumer() { if (!IsConnect) { await StartConnect(); } var consumer new AsyncEventingBasicConsumer(Channel); // 处理消息的异步回调逻辑 consumer.ReceivedAsync ReceivedAsync; // 开始消费 string result await Channel.BasicConsumeAsync( queue: connectData.receiveQueeu, // 指定消费者要监听的队列名称 autoAck: false, // 决定是否自动确认消息。如果 true消息在交付时会自动确认。如果 false则需要手动调用 BasicAck 确认消息 consumer: consumer); // 指定消息的处理方式通过实现 IBasicConsumer 接口来定义如何处理从队列中接收到的消息 /* autoAck true消息一旦传递给消费者RabbitMQ 就认为该消息被成功处理无需再确认。 autoAck false消费者需要显式地调用 channel.BasicAck 来确认消息的处理通常用于消息处理失败时能够重试消息。 */ return result; } /// summary /// 异步接收消息 /// 如果 Channel.BasicConsumeAsync 方法中 autoAck 设置为 true那么 channel.BasicAckAsync 调用是不允许的 /// 想在 Channel.BasicConsumeAsync 消费消息收到消息时 调用 channel.BasicAckAsync必须将 Channel.BasicConsumeAsync 方法中 autoAck 设置为 false /// /summary /// param namesender/param /// param nameeventArgs/param /// returns/returns private async Task ReceivedAsync(object sender, BasicDeliverEventArgs eventArgs) { try { //Debug.Log(ReceivedAsync); AsyncEventingBasicConsumer consumer sender as AsyncEventingBasicConsumer; string queue consumer.Channel.CurrentQueue; var body eventArgs.Body.ToArray(); receivedCallBack?.Invoke(queue, body); // 模拟异步任务处理比如访问数据库或调用其他服务 await channel.BasicAckAsync(eventArgs.DeliveryTag, false); } catch (Exception ex) { Debug.LogError($Error processing message: {ex.Message}); // 如果处理失败可以拒绝并重新入队可选 //await Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: true); } await Task.Delay(1); } /// summary /// 关闭连接 /// /summary public async void Dispose() { dispose true; // 先关闭通道、再关闭连接 if (channel ! null) // 通道关闭 { await channel.CloseAsync(); channel null; } if (connection ! null) // 连接关闭 { UnityEngine.Debug.Log(ConnectDispose); await connection.CloseAsync(); connection null; } await Task.Delay(1); } }}RibbitMQ 服务通过 queue 来区分每一个连接的客户端代码部分如下QueueDeclareOk queueDeclareOk await channel.QueueDeclareAsync( queue: queue, durable: false, exclusive: false, autoDelete: false, arguments: null);客户端实例测试用例using UnityEngine;using Network;using LitJson;using System.Text;using System.Collections;using System.Collections.Generic;public class RabbitMQDemo : MonoBehaviour{// 客户端private RabbitMQConnect rabbitMQConnect;private Queue receiveQueue new Queue();void Start() { RabbitMQConnectData connectData new RabbitMQConnectData(); connectData.queue TestA; connectData.receiveQueeu TestA; connectData.hostName XXX.XXX.XXX.XXX; connectData.port 5672; connectData.userName unity; connectData.password unity; // 实例化 rabbitMQConnect new RabbitMQConnect(connectData); rabbitMQConnect.SetReceive(Receive); StartConnect(); } private async void StartConnect() { await rabbitMQConnect.StartConnect(); } private async void Send(string meg) { await rabbitMQConnect.SendAsync(meg); } private void Receive(string queue, byte[] byteData) { var json Encoding.UTF8.GetString(byteData); UnityEngine.Debug.Log($[x] ReceivedAsync: {json}); receiveQueue.Enqueue(netWorkData); } private int number 1000; // Update is called once per frame void Update() { if (Input.GetKeyDown(KeyCode.A)) { number; Send(Hello RabbitMQ: number); } DispatchMessage(); } private void DispatchMessage() { if (receiveQueue.Count 0) { return; } string json receiveQueue.Dequeue(); } private void OnDestroy() { Debug.LogError(OnDestroy); rabbitMQConnect.Dispose(); }}/// summary /// 网络连接状态 /// /summary public enum NetWorkState { // init /// summary /// 关闭/断开连接 /// /summary Closed, // client /// summary /// 已经建立连接 /// /summary Connected, /// summary /// 正在请求连接 /// /summary Connecting, /// summary /// 连接失败 /// /summary ConnectFailed, // both /// summary /// 连接超时 /// /summary Timeout, /// summary /// 断开连接 /// /summary Disconnected, }扩展可以在 网页上 Overview 页面找到 Ports and contexts 部分可以看到每种协议对应的端口是不一样的每种协议都有一种独立的连接方式需要根据自己选择的协议拼接路径比如 我上面代码使用的 http 方式string localHost localhost; // ip如 xxx.xxx.xxx.xxx string userName 用户名; string password 密码; // 创建连接工厂 // 如果初始化失败不会启动恢复连接 factory new ConnectionFactory() { HostName hostName, // 替换为你的 RabbitMQ 服务器地址 UserName userName, // 替换为用户名 Password password // 替换为密码 };amqp 协议连接方式如下string url $amqp://{userName}:{password}{hostName}:{port}; factory new ConnectionFactory() { Uri new Uri(url) };

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询