记录下较为完整的Rabbitmq学习笔记_编写将mq对象保存到数据库进行人工检查数据完整性-程序员宅基地

技术标签: java  rabbitmq  

Ribbitmq概括

概念

    消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

看场景理解mq

如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?这就需要消息队列登场了。

消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。

Windows环境安装

rabbitMQ是Erlang语言开发的所以先下载
Erlang:下载地址
双击安装完成后
1、配置环境变量

系统变量
ERLANG_HOME
D:\java\erl-24.0
环境变量 
%ERLANG_HOME%\bin

2、windows打开cmd控制台输入cmd,测试输入erl出现一下内容

Eshell V12.0  (abort with ^G)
1>

3、下载RabbitMQ
Rabbitmq:下载地址
4、双击安装
安装完成后,开始安装RabbitMQ-Plugins插件

先cd D:\java\RabbitMQ Server\rabbitmq_server-3.9.4\sbin

然后运行命令:rabbitmq-plugins enable rabbitmq_management

出现一下画面成功

Enabling plugins on node rabbit@WNDN-750:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@WNDN-750...
The following plugins have been enabled:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
started 3 plugins.

5、执行rabbitmqctl status,出现以下内容,说明成功

Status of node rabbit@WNDN-750 ...
[1mRuntime[0m
OS PID: 14016
OS: Windows
Uptime (seconds): 185
Is under maintenance?: false
RabbitMQ version: 3.9.4
Node name: rabbit@WNDN-750
Erlang configuration: Erlang/OTP 24 [erts-12.0] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1] [jit]
Erlang processes: 402 used, 1048576 limit
Scheduler run queue: 1
Cluster heartbeat timeout (net_ticktime): 60
[1mPlugins[0m
...............

6、运行 D:\java\RabbitMQ Server\rabbitmq_server-3.9.4\sbin\rabbitmq-server.bat

等几秒钟,在浏览器访问http://localhost:15672/

successful……

Linux环境安装

Docker installation

1)yum 包更新到最新
> yum update
(2)安装需要的软件包, yum-util 提供yum-config-manager功能,另外两个是devicemapper驱动依赖的
> yum install -y yum-utils device-mapper-persistent-data lvm2
(3)设置yum源为阿里云
> yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo
(4)安装docker
> yum install docker-ce -y
(5)安装后查看docker版本
> docker -v
 (6) 安装加速镜像
 sudo mkdir -p /etc/docker
 sudo tee /etc/docker/daemon.json <<-'EOF'
 {
    
  "registry-mirrors": ["https://0wrdwnn6.mirror.aliyuncs.com"]
 }
 EOF
 sudo systemctl daemon-reload
 sudo systemctl restart docker
 (7) 获取rabbit镜像:
> docker pull rabbitmq:management
 (8)创建并运行容器
> docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e           RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
 (9)查看日志
> docker logs -f myrabbit
 (10)查看服务
> docker ps -a
 (11)关闭容器
> docker be9df4f0292e stop

Other commands

# 启动docker:
systemctl start docker
# 停止docker:
systemctl stop docker
# 重启docker:
systemctl restart docker
# 查看docker状态:
systemctl status docker
# 开机启动:  
systemctl enable docker
systemctl unenable docker
# 查看docker概要信息
docker info
# 查看docker帮助文档
docker --help

rabbitmq修改密码

  1. 在所有应用中找到rabbitMQ command promot程序并单击单开。
  2. 在打开的命令窗口中输入rabbitmqctl add_user test 123456后回车,test为新增登录账户,123456为账户密码。
  3. 然后再敲入rabbitmqctl set_user_tags test administrator后回车。
  4. 再给test账户设置 操作主机的权限。敲入rabbitmqctl set_permissions -p / test “.“ “.“ “.*”。
  5. 回到登录页面,用账户名为test,密码为123456进行登录就ok了。

Ribbitmq队列

消息队列协议
AMQP:(全称:Advanced Message Queuing Protocol) 是高级消息队列协议。由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
特性:
1:分布式事务支持。
2:消息的持久化支持。
3:高性能和高可靠的消息处理优势。
在这里插入图片描述
面试题:为什么ribbitmq不使用http协议

  1. 因为http协议请求头很复杂,包含了cookies,数据的加密解密,状态码等附加功能,但是对于一个消息而言,我们并不需要这么复杂,也没有这个必要性,他其实就负责数据传递,存储,分发就可以了,一定要追求的是高性能,尽量简洁,快速。
  2. 大部分情况下http都是短连接,在交互过程中可能因为服务器宕机中断以后就不会进行持久化,就会会照成请求的丢失,这样就不利于消息中间件的业务场景,因为消息中间件可能是一个长期获取消息得过程,出现问题和故障要对数据或消息进行持久化等,目的就是为了保障数据得高可靠和稳健的运行。

消息队列持久化
RabbitMQ在两种情况下会将消息写入磁盘:

  1. 消息本身在 publish 的时候就要求消息写入磁盘;
  2. 内存紧张 需要将部分内存中的消息转移到磁盘;
    在这里插入图片描述
    消息队列消费策略
    MQ消息队列有如下几个角色
    1:生产者
    2:存储消息
    3:消费者
    那么生产者生成消息以后,MQ进行存储,消费者是如何获取消息的呢?一般获取数据的方式无外乎推(push)或者拉(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送的过程,而这些推机制会适用到很多的业务场景也有很多对应推机制策略。

场景1
在这里插入图片描述
比如我在APP上下了一个订单,我们的系统和服务很多,我们如何得知这个消息被那个系统或者那些服务或者系统进行消费,那这个时候就需要一个分发的策略。这就需要消费策略。或者称之为消费的方法论。

场景2
在这里插入图片描述
在发送消息的过程中可能会出现异常,或者网络的抖动,故障等等因为造成消息的无法消费,比如用户在下订单,消费MQ接受,订单系统出现故障,导致用户支付失败,那么这个时候就需要消息中间件就必须支持消息重试机制策略。也就是支持:出现问题和故障的情况下,消息不丢失还可以进行重发。

消息队列高可用和高可靠
什么是高可用机制

高可用是指产品在规定的条件和规定的时刻或者时间内处于可执行规定功能状态的能力。

当业务量增加时,请求也过大,一台消息中间件的服务器会触及硬件(CPU、内存、磁盘)的极限,一台消息中 间件的服务器已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的。

什么是高可靠

在高并发应用场景中,如果不能保证系统的高可靠,那造成的隐患和损失是非常严重的。

保证中间件消息的可靠性尼?可从两个方面考虑

1:消息的传输:通过协议来保证系统间数据解析的正确性。

2、消息的存储可靠:通过持久化来保证消息的可靠性。

反正终归三句话:
1:要么消息共享,
2:要么消息同步
3:要么元数据共享

五种工作模式

    1、生产者:生成消息,发送到交换机

交换机:根据消息属性,将消息发送给队列(如果没有声明交换机,则使用默认交换机)

​ 消费者:监听这个队列,发现消息后,获取消息执行消费逻辑

​ 应用场景:

​ 常见的应用场景就是一发,一接的结构

​ 例如:

​ 手机短信,邮件单发

代码测试

package cn.tedu.test.rabbit;
import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 完成简单模式一发一接的结构
 */
public class SimpleMode {
    
    //初始化连接对象 短连接
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        //ip:port tedu/123456
        /*
            1.长链接工厂,提供4个属性 ip port tedu 123456
            2.获取长连接
            3.给成员变量channel赋值
         */
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    //测试包含3个方法
    //声明组件,交换机和队列,简单模式案例,交换机使用默认交换机.队列需要声明
    @Test
    public void myQueueDeclare() throws IOException {
    
        //调用channel的方法,声明队列
        channel.queueDeclare(
                "simple",//设置路由key
                false,//boolean类型,队列是否持久化
                false,//boolean类型,队列是否专属,
                // 只有创建声明队列的连接没有断开,队列才可用
                false,//boolean类型,队列是否自动删除.从第一个消费端监听队列开始
                //计算,直到最后一个消费端断开连接,队列就会自动删除
                null);//map类型,key值是固定一批属性
        System.out.println("队列声明成功");
    }
    //发送消息到队列 生产端,永远不会吧消息直接发给队列,发给交换机
    //目前可以使用7个交换机来接收消息
    @Test
    public void send() throws IOException {
    
        //准备个消息 发送的是byte[]
        String msg="宝贝一到手,风紧扯呼";
        byte[] msgByte=msg.getBytes();
        //将消息发给(AMQP DEFAULT)交换机 名字""
        channel.basicPublish(
                "",//发送给的交换机的名字,默认为空
                "simple",//路由key,你想让交换机把消息传递给哪个队列的名称
                null,//发送消息时,携带的头,属性等.例如
                // app_id content-type priority优先级
                msgByte//消息体
        );
    }
    //消费端
    @Test
    public void consume() throws IOException {
    
        //消费消息
        channel.basicConsume("simple", false,
                new DeliverCallback() {
    
                    /**传递回调对象. 消息就在这个对象里
                     * @param consumerTag 当前消费端id
                     * @param message 封装了消息的对象
                     * @throws IOException
                     */
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
    
                //从消息对象中拿到信息
                byte[] body = message.getBody();
                System.out.println(new String(body));
                //如果autoAck false说明消费完消息,需要手动确认
                channel.basicAck(
                        message.getEnvelope().getDeliveryTag(),
                        false);
            }
        }, new CancelCallback() {
    
                    /**
                     * 当连接对象channel 主动关闭消费端连接时 cancel 这个方法才会被调用
                     * @param consumerTag 消费端id
                     * @throws IOException
                     */
            @Override
            public void handle(String consumerTag) throws IOException {
    
            }
        });
        //使用while true 将线程卡死,否则看不到消息消费逻辑
        while(true);
    }
}
    当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢?

主要有两种模式:
1、轮询模式的分发:一个消费者一条,按均分配;
2、公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

默认轮询,以下为

package com.xuexiangban.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * @author: 学相伴-飞哥
 * @description: Producer 简单队列生产者
 * @Date : 2021/3/2
 */
public class Producer {
    
    pub
        try {
    
            // 3: 从连接工厂中获取连接
            connection = connectionFactory.newConnection("生产者");
            // 4: 从连接中获取通道channel
            channel = connection.createChannel();
            // 6: 准备发送消息的内容
            //===============================end topic模式==================================
            for (int i = 1; i <= 20; i++) {
    
                //消息的内容
                String msg = "学相伴:" + i;
                // 7: 发送消息给中间件rabbitmq-server
                // @params1: 交换机exchange
                // @params2: 队列名称/routingkey
                // @params3: 属性配置
                // @params4: 发送消息的内容
                channel.basicPublish("", "queue1", null, msg.getBytes());
            }
            System.out.println("消息发送成功!");
    }
}

消费者1的逻辑

Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
    
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
    
                    try{
    
                        System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                    }catch(Exception ex){
    
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
    
                @Override
                public void handle(String s) throws IOException {
    
                }
            });

消费者2的逻辑

Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue1", true, new DeliverCallback() {
    
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
    
                    try{
    
                        System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(200);
                    }catch(Exception ex){
    
                        ex.printStackTrace();
                    }
                }
            }, new CancelCallback() {
    
                @Override
                public void handle(String s) throws IOException {
    
                }
            });

工作争抢
生产者:发送消息到交换机

交换机:根据消息属性将消息发送给队列

消费者:多个消费者,同时绑定监听一个队列,之间形成了争抢消息的效果

应用场景

​ 抢红包

​ 资源分配

代码实现

package cn.tedu.test.rabbit;
import com.rabbitmq.client.*;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 完成一发多抢的结构
 */
public class WorkMode {
    
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    @Test
    public void myQueueDeclare() throws IOException {
    
        //调用channel的方法,声明队列
        channel.queueDeclare(
                "work",
                false,
                false,
                false,
                null);
        System.out.println("队列声明成功");
    }
    @Test
    public void send() throws IOException {
    
        //准备个消息 发送的是byte[]
        String msg="宝贝一到手,风紧扯呼1111";
        byte[] msgByte=msg.getBytes();
        //将消息发给(AMQP DEFAULT)交换机 名字""
        channel.basicPublish(
                "",//发送给的交换机的名字
                "work",//路由key,你想让交换机把消息传递给哪个队列的名称
                null,//发送消息时,携带的头,属性等.例如
                // app_id content-type priority优先级
                msgByte//消息体
        );
    }
    //消费端
    @Test
    public void consume01() throws IOException {
    
        //消费消息
        channel.basicConsume("work", false,
                new DeliverCallback() {
    
                    /**传递回调对象. 消息就在这个对象里
                     * @param consumerTag 当前消费端id
                     * @param message 封装了消息的对象
                     * @throws IOException
                     */
            @Override
            public void handle(String consumerTag, Delivery message) throws IOException {
    
                //从消息对象中拿到信息
                byte[] body = message.getBody();
                System.out.println("消费者01:"+new String(body));
                //如果autoAck false说明消费完消息,需要手动确认
                channel.basicAck(
                        message.getEnvelope().getDeliveryTag(),
                        false);
            }
        }, new CancelCallback() {
    
                    /**
                     * 当连接对象channel 主动关闭消费端连接时 cancel 这个方法才会被调用
                     * @param consumerTag 消费端id
                     * @throws IOException
                     */
            @Override
            public void handle(String consumerTag) throws IOException {
    
            }
        });
        //使用while true 将线程卡死,否则看不到消息消费逻辑
        while(true);
    }
    @Test
    public void consume02() throws IOException {
    
        //消费消息
        channel.basicConsume("work", false,
                new DeliverCallback() {
    
                    /**传递回调对象. 消息就在这个对象里
                     * @param consumerTag 当前消费端id
                     * @param message 封装了消息的对象
                     * @throws IOException
                     */
                    @Override
                    public void handle(String consumerTag, Delivery message) throws IOException {
    
                        //从消息对象中拿到信息
                        byte[] body = message.getBody();
                        System.out.println("消费者02:"+new String(body));
                        //如果autoAck false说明消费完消息,需要手动确认
                        channel.basicAck(
                                message.getEnvelope().getDeliveryTag(),
                                false);
                    }
                }, new CancelCallback() {
    
                    /**
                     * 当连接对象channel 主动关闭消费端连接时 cancel 这个方法才会被调用
                     * @param consumerTag 消费端id
                     * @throws IOException
                     */
                    @Override
                    public void handle(String consumerTag) throws IOException {
    
                    }
                });
        //使用while true 将线程卡死,否则看不到消息消费逻辑
        while(true);
    }
}

路由模式
生产端:发送的消息携带具体的路由key值

交换机:接收路由key值,判断和当前交换机绑定后端队列哪个满足路由的匹配将消息发送给这个队列

应用场景

处理一些特殊的消息逻辑,可以经过路由的筛选

代码测试

package cn.tedu.test.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 路由模式
 */
public class DirectMode {
    
    //初始化连接
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        //ip:port tedu/123456
        /*
            1.长链接工厂,提供4个属性 ip port tedu 123456
            2.获取长连接
            3.给成员变量channel赋值
         */
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    //准备交换机,队列的名称属性
    private static final String TYPE="direct";
    private static final String EX_NAME=TYPE+"_ex";//fanout_ex
    private static final String QUEUE01=TYPE+"_Q1";
    private static final String QUEUE02=TYPE+"_Q2";
    //声明三个组件 一个交换机 2个队列
    @Test
    public void declare() throws IOException {
    
        //声明队列
        channel.queueDeclare(QUEUE01,false,false,false,null);
        channel.queueDeclare(QUEUE02,false,false,false,null);
        //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
        //声明交换机
        channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机
        //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意
        channel.queueBind(QUEUE01,EX_NAME,"北京");
        channel.queueBind(QUEUE01,EX_NAME,"广州");
        channel.queueBind(QUEUE02,EX_NAME,"上海");
    }
    @Test
    public void send() throws IOException {
    
        String msg="你好,路由模式交换机";
        byte[] bytes = msg.getBytes();
        channel.basicPublish(EX_NAME,"北京",null,bytes);
    }
}

发布订阅
生产端:发送消息到交换机

交换机:由于是发布订阅模式,会将这个消息发送同步到后端所有与其绑定的队列

消息端:简单模式 1个队列绑定一个消费者 争抢模式 1个队列绑定多个消费者

应用场景

邮件的群发,广告的群发

代码测试

package cn.tedu.test.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 一发多接的队列结构
 */
public class FanoutMode {
    
    //初始化连接
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        //ip:port tedu/123456
        /*
            1.长链接工厂,提供4个属性 ip port tedu 123456
            2.获取长连接
            3.给成员变量channel赋值
         */
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(15672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    //准备交换机,队列的名称属性
    private static final String TYPE="fanout";
    private static final String EX_NAME=TYPE+"_ex";//fanout_ex
    private static final String QUEUE01=TYPE+"_Q1";
    private static final String QUEUE02=TYPE+"_Q2";
    //声明三个组件 一个交换机 2个队列
    @Test
    public void declare() throws IOException {
    
        //声明队列
        channel.queueDeclare(QUEUE01,false,false,false,null);
        channel.queueDeclare(QUEUE02,false,false,false,null);
        //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
        //声明交换机
        channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机
        //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意
        channel.queueBind(QUEUE01,EX_NAME,"");
        channel.queueBind(QUEUE02,EX_NAME,"");
    }
    @Test
    public void send() throws IOException {
    
        String msg="你好,发布订阅模式";
        byte[] bytes = msg.getBytes();
        channel.basicPublish(EX_NAME,"北京",null,bytes);
    }
}

主题模式
结构

交换机绑定队列,不在使用具体的路由key,可以使用符号代替路由key值的规则

#:表示任意多级的任意长度的字符串

*:任意长度字符串,但是只有一级

​ 中国.北京.朝阳.望京.葫芦村

 匹配到 中国.#
 匹配到 中国.上海.#
 匹配到 中国.*.*.*
 匹配到 中国.*.朝阳.*.*

应用场景

实现多级传递的路由筛选工作,记录trace过程.

代码测试

package cn.tedu.test.rabbit;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
 * 主题模式
 */
public class TopicMode {
    
    //初始化连接
    private Channel channel;
    @Before
    public void channelInit() throws IOException, TimeoutException {
    
        //ip:port tedu/123456
        /*
            1.长链接工厂,提供4个属性 ip port tedu 123456
            2.获取长连接
            3.给成员变量channel赋值
         */
        ConnectionFactory factory=new ConnectionFactory();
        factory.setHost("192.168.91.151");
        factory.setPort(5672);
        factory.setUsername("tedu");
        factory.setPassword("123456");
        Connection connection = factory.newConnection();
        channel=connection.createChannel();
    }
    //准备交换机,队列的名称属性
    private static final String TYPE="topic";
    private static final String EX_NAME=TYPE+"_ex";//fanout_ex
    private static final String QUEUE01=TYPE+"_Q1";
    private static final String QUEUE02=TYPE+"_Q2";
    //声明三个组件 一个交换机 2个队列
    @Test
    public void declare() throws IOException {
    
        //声明队列
        channel.queueDeclare(QUEUE01,false,false,false,null);
        channel.queueDeclare(QUEUE02,false,false,false,null);
        //只会使用自己的名字,绑定默认交换机,暂时和我们自定义交换机没有关系
        //声明交换机
        channel.exchangeDeclare(EX_NAME,TYPE);//声明了一个名为 fanout_ex 类型为fanout的交换机
        //绑定交换机和队列的关系,由于发布订阅,绑定时需要提供自定义的路由key,随意
        channel.queueBind(QUEUE01,EX_NAME,"中国.北京.#");
        channel.queueBind(QUEUE01,EX_NAME,"中国.*.*.*.*");
        channel.queueBind(QUEUE02,EX_NAME,"*.上海.#");
    }
    @Test
    public void send() throws IOException {
    
        String msg="你好,路由模式交换机";
        byte[] bytes = msg.getBytes();
        channel.basicPublish(EX_NAME,"中国.北京.大兴.亦庄",null,bytes);
    }
}

SpringBoot整合rabbitmq

Fanout
使用springboot完成rabbitmq的消费-Fanout

整合业务逻辑图
在这里插入图片描述
实现步骤

  1. 创建Spring Initinlizr项目 —producer

    勾选web+rabbitmq组件,编写yml配置

    编写下单业务逻辑接口

@Component
public class OrderService {
    
    @Autowired
    private RabbitTemplate rabbitTemplate;
    private String exchangeName = "fanout_order_exchange";
    private String routeKey = "";
    public void makeOrder(Long userId, Long productId, int num) {
    
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer);
    }
}

创建配置类完成队列和交换机,并完成绑定

@Configuration
public class DirectRabbitConfig {
    
    @Bean
    public Queue emailQueue() {
    
        return new Queue("email.fanout.queue", true);
    }
    @Bean
    public Queue smsQueue() {
    
        return new Queue("sms.fanout.queue", true);
    }
    @Bean
    public Queue weixinQueue() {
    
        return new Queue("weixin.fanout.queue", true);
    }
    @Bean
    public DirectExchange fanoutOrderExchange() {
    
        return new DirectExchange("fanout_order_exchange", true, false);
    }
    @Bean
    public Binding bindingDirect1() {
    
        return BindingBuilder.bind(weixinQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect2() {
    
        return BindingBuilder.bind(smsQueue()).to(fanoutOrderExchange()).with("");
    }
    @Bean
    public Binding bindingDirect3() {
    
        return BindingBuilder.bind(emailQueue()).to(fanoutOrderExchange()).with("");
    }
}

编写一个发送消息的测试类

@SpringBootTest
class SpringBootOrderRabbitmqProducerApplicationTests {
    
    @Autowired
    OrderService orderService;
    @Test
    void contextLoads() throws InterruptedException {
    
        for (int i = 0; i < 10; i++) {
    
            Thread.sleep(1000);
            Long userId = 100L + i;
            Long productId = 10001L + i;
            int num = 10;
            orderService.makeOrder(userId, productId, num);
        }
    }
}

创建Spring Initinlizr项目 —consumer

编写一个监听消息队列的接口,监视指定队列,并消费消息

@RabbitListener(queues = "weixin.fanout.queue")
@Component
public class EmailController {
    
    @RabbitHandler
    public void  messagerevice(String msg){
    
        System.out.println("邮件发送消息:"+msg);
    }
}
@RabbitListener(queues = "sms.fanout.queue")
@Component
public class SMSController {
    
    @RabbitHandler
    public void  smsrevice(String msg){
    
        System.out.println("sms发送消息:"+msg);
    }
}
@RabbitListener(queues = "weixin.fanout.queue")
@Component
public class WechatController {
    
    @RabbitHandler
    public void  messagerevice(String msg){
    
        System.out.println("微信发送消息:"+msg);
    }
}

Direct
direct和fanout模式的区别

定义交换机的名字不同

绑定关系时添加了路由key

pull消息到queue时,指定了路由key
实现逻辑:

  1. 创建Spring Initinlizr项目 —comsumer

    勾选web+rabbitmq组件,编写yml配置

    编写下单业务逻辑接口

public void makeOrderDirect(String userId, String productId, int num) {
    
        private String routeKey1 = "sms";
        private String routeKey2 = "email";
        private String DirectExchangeName = "direct_order_exchange";
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(DirectExchangeName, routeKey1, orderNumer);
        rabbitTemplate.convertAndSend(DirectExchangeName, routeKey2, orderNumer);
    }

创建配置类完成队列和交换机,并完成绑定

@Configuration
public class DirectRabbitConfig {
    
    @Bean
    public Queue emailQueueDirect() {
    
        return new Queue("email.direct.queue", true);
    }
    @Bean
    public Queue smsQueueDirect() {
    
        return new Queue("sms.direct.queue", true);
    }
    @Bean
    public Queue weixinQueueDirect() {
    
        return new Queue("weixin.direct.queue", true);
    }
    @Bean
    //区别1
    public DirectExchange directOrderExchange() {
    
        return new DirectExchange("direct_order_exchange", true, false);
    }
    @Bean
    public Binding bindingDirect1Direct() {
    
        return BindingBuilder.bind(weixinQueueDirect()).to(directOrderExchange()).with("weixin");//区别2
    }
    @Bean
    public Binding bindingDirect2Direct() {
    
        return BindingBuilder.bind(smsQueueDirect()).to(directOrderExchange()).with("sms");
    }
    @Bean
    public Binding bindingDirect3Direct() {
    
        return BindingBuilder.bind(emailQueueDirect()).to(directOrderExchange()).with("email");
    }
}

编写一个发送消息的测试类

    @Test
    void contextLoads1() throws InterruptedException {
    
        orderService.makeOrderDirect("1","1",12);
    }
    @Test
    void contextLoads2() throws InterruptedException {
    
        orderService.makeOrderDirect("1","1",12);
    }

创建Spring Initinlizr项目 —consumer

编写一个监听消息队列的接口,监视指定队列,并消费消息

@RabbitListener(queues = "weixin.direct.queue")
@Component
public class EmailControllerDirect {
    
    @RabbitHandler
    public void  emailDirectRevice(String msg){
    
        System.out.println("direct->>>>>>>>>邮件发送消息:"+msg);
    }
}
@RabbitListener(queues = "sms.direct.queue")
@Component
public class SMSControllerDirect {
    
    @RabbitHandler
    public void  smsDirectRevice(String msg){
    
        System.out.println("direct->>>>>>>>>sms发送消息:"+msg);
    }
}
@RabbitListener(queues = "weixin.direct.queue")
@Component
public class WechatControllerDirect {
    
    @RabbitHandler
    public void  emailDirectRevice(String msg){
    
        System.out.println("direct->>>>>>>>>微信发送消息:"+msg);
    }
}

Topic
topic和direct区别

发送消息根据模糊路由匹配

没有定义配置类,绑定逻辑通过注解编写在消费端
实现逻辑

  1. 创建Spring Initinlizr项目 —comsumer

    勾选web+rabbitmq组件,编写yml配置

    编写下单业务逻辑接口

public void makeOrderTopic(String userId, String productId, int num) {
    
        // 1: 模拟用户下单
        String orderNumer = UUID.randomUUID().toString();
        /**
         *  *.email.#
         *  #.sms.#
         *  com.#
         */
        String topicExchangeName = "topic_order_exchange";
        String routeKey = "com"; //输出:topic ->>>>>>>>>微信发送消息:7aefec2c-60da-404c-ba71-cce63839c74f
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(topicExchangeName, routeKey, orderNumer);
    }

编写一个发送消息的测试类

 @Test
    void contextLoads2Topic() throws InterruptedException {
    
        orderService.makeOrderTopic("1","1",12);
    }

创建Spring Initinlizr项目 —consumer

编写一个监听消息队列的接口,监视指定队列,并消费消息

@RabbitListener(bindings =@QueueBinding(
        // email.fanout.queue 是队列名字,这个名字你可以自定随便定义。
        value = @Queue(value = "email.topic.queue",autoDelete = "false",durable = "true"),
        // order.fanout 交换机的名字 必须和生产者保持一致
        exchange = @Exchange(value = "topic_order_exchange",
                // 这里是确定的rabbitmq模式是:fanout 是以广播模式 、 发布订阅模式
                type = ExchangeTypes.TOPIC),key = "*.email.#"
))
@Component
public class EmailControllerTopic {
    
    @RabbitHandler
    public void  emailTopicRevice(String msg){
    
        System.out.println("topic->>>>>>>>>邮件发送消息:"+msg);
    }
}
@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "sms.topic.queue",autoDelete = "false",durable = "true"),
        exchange = @Exchange(value = "topic_order_exchange",
                type = ExchangeTypes.TOPIC),key = "#.sms.#"
))
@Component
public class SMSControllerTopic {
    
    @RabbitHandler
    public void  smsTopicRevice(String msg){
    
        System.out.println("topic->>>>>>>>>sms发送消息:"+msg);
    }
}
@RabbitListener(bindings =@QueueBinding(
        value = @Queue(value = "weixin.topic.queue",autoDelete = "false",durable = "true"),
        exchange = @Exchange(value = "topic_order_exchange",
                type = ExchangeTypes.TOPIC),key = "com.#"
))
@Component
public class WechatControllerTopic {
    
    @RabbitHandler
    public void  emailTopicRevice(String msg){
    
        System.out.println("topic ->>>>>>>>>微信发送消息:"+msg);
    }
}

ttl过期时间

过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取;过了之后消息将自动被删除。RabbitMQ可以对消息和队列设置TTL。目前有两种方法可以设置。

第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。
第二种方法是对消息进行单独设置,每条消息TTL可以不同。
如果上述两种方法同时使用,则消息的过期时间以两者之间TTL较小的那个数值为准。消息在队列的生存时间一旦超过设置的TTL值,就称为dead message被投递到死信队列, 消费者将无法再收到该消息。

实现设置队列过期时间
配置类

@Configuration
public class ttlRabbitmqConfig {
    
    @Bean
    public Queue queue1(){
    
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        return new Queue("ttl.queue",true,false,false,args);
    }
    @Bean
    public DirectExchange ttlExchange() {
    
        return new DirectExchange("ttl_order_exchange", true, false);
    }
    @Bean
    public Binding bindingExchange() {
    
        return BindingBuilder.bind(queue1()).to(ttlExchange()).with("ttl");
    }
}

业务层

public void makeOrderTtl(String userId, String productId, int num) {
    
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("user:"+orderNumer);
        String routeKey = "ttl";
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(ttlExchangeName, routeKey, orderNumer);
    }

测试类

   @Test
    void contextLoads2Ttl() throws InterruptedException {
    
        orderService.makeOrderTtl("1","1",12);
    }

消费者监视类

@RabbitListener(queues = "ttl.queue")
@Component
public class ttlController {
    
    @RabbitHandler
    public void  ttlRevice(String msg){
    
        System.out.println("ttl -->>>>>>>邮件发送消息:"+msg);
    }
}

实现设置消息过期机制
producer代码

    @Bean
    public Queue messageQueue() {
    
        return new Queue("message.queue",true);
    }
    @Bean
    public DirectExchange messageOrderExchange() {
    
        //  return new DirectExchange("TestDirectExchange",true,true);
        return new DirectExchange("message_order_exchange", true, false);
    }
     @Bean
    public Binding bindingMessage() {
    
        return BindingBuilder.bind(messageQueue()).to(messageOrderExchange()).with("message");
    }

producer发送消息代码

public void ttlOrder(String userId, String productId, int num) {
    
        String exchangeName = "message_order_exchange";
        String routeKey = "message";
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("用户 " + userId + ",订单编号是:" + orderNumer);
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
    
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
    
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("utf-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName, routeKey,userId, messagePostProcessor);
    }

consumer消费者监听方法

@RabbitListener(queues = "message.queue")
@Component
public class messageController {
    
    @RabbitHandler
    public void messageRevice(String msg) {
    
        System.out.println("message->>>>消费消息");
    }
}

生产者测试类发送消息

@Test
    void contextLoads1() {
    
        orderService.ttlOrder("1"," 1",12);
    }

死信队列案例
概念

当一条消息在队列中出现以下三种情况的时候,该消息就会变成一条死信。

消息被拒绝(basic.reject / basic.nack),并且requeue = false
消息TTL过期
队列达到最大长度
当消息在一个队列中变成一个死信之后,如果配置了死信队列,它将被重新publish到死信交换机,死信交换机将死信投递到一个队列上,这个队列就是死信队列。
在这里插入图片描述
生产者配置类

@Bean
    public Queue queue1(){
    
        //做了参数的变更和消费不会失败,会报错
        Map<String,Object> args = new HashMap<>();
        args.put("x-message-ttl",5000);
        args.put("x-dead-letter-exchange","dead_direct_exchange");
        args.put("x-dead-letter-routing-key","dead");
        return new Queue("ttl.queue",true,false,false,args);
    }
    @Bean
    public DirectExchange ttlExchange() {
    
        return new DirectExchange("ttl_order_exchange", true, false);
    }
    @Bean
    public Binding bindingExchange() {
    
        return BindingBuilder.bind(queue1()).to(ttlExchange()).with("ttl");
    }

生产者发送消息业务

 //队列过期
    public void makeOrderTtl(String userId, String productId, int num) {
    
        String ttlExchangeName = "ttl_order_exchange";
        String routeKey = "ttl";
        String orderNumer = UUID.randomUUID().toString();
        System.out.println("user:"+orderNumer);
        // 发送订单信息给RabbitMQ fanout
        rabbitTemplate.convertAndSend(ttlExchangeName, routeKey, orderNumer);
    }

生产者发送消息测试类

 @Autowired
    private TtlService ttlService;
    @Test
    void contextLoads2Ttl() throws InterruptedException {
    
        ttlService.makeOrderTtl("1","1",12);
    }

消费者

@RabbitListener(queues = "ttl.queue")
public class TtlController {
    
    @RabbitHandler
    public void  ttlRevice(String msg){
    
        System.out.println("ttl -->>>>>>>邮件发送消息:"+msg);
    }
}

Rabbitmq分布式事务

美团业务架构图
在这里插入图片描述
系统间调用过程中事务回滚问题
在这里插入图片描述
订单服务

系统结构

order-service

   entity:    OrderDataBaseService
   pojo:    Order
   mapper:    OrderMapper
   service:    OrderService
   test:    OrderServiceApplicationTests

sql脚本

CREATE TABLE `order_service` (
  `order_id` int(50) DEFAULT NULL,
  `user_id` int(50) DEFAULT NULL,
  `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  `create_time` varchar(50) COLLATE utf8_bin DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

pom.xml

srpingboot-web+rabbitmq+mybatis+jdbc+mysql+org.apache.common+com.fasterxml.jackson.dataformat

application.properties

server.port=8082
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/cn_tedu_order?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
spring.datasource.username=root
spring.datasource.password=root
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
mybatis.mapper-locations=classpath:mapper/*.xml
mybatis.type-aliases-package=cn.tedu.orderservice.pojo
# Mybatis显示sql语句输出的配置
logging.level.cn.tedu.mybatis.mapper=TRACE

order-service

entity:    OrderDataBaseService
@Component
public class OrderDataBaseService {
    
    @Autowired
    private OrderMapper orderMapper;
    public int saveOrder(Order orderInfo){
    
        int i = orderMapper.saveOrder(orderInfo);
        return i;
    }
}
mapper:    OrderMapper
@Repository
public interface OrderMapper {
    
    @Insert("insert into order_service values(#{orderId},#{userId},#{orderContent},#{createTime})")
    int saveOrder(Order order);
}
 pojo:    Order
@Lombok
public class Order {
    
    private int orderId;
    private int userId;
    private String orderContent;
    private String createTime;
}
service:    OrderService
@Service
public class OrderService {
    
    @Autowired
    private OrderDataBaseService orderDataBaseService;
    // 创建订单
    @Transactional(rollbackFor = Exception.class) // 订单创建整个方法添加事务
    public void createOrder(Order orderInfo) throws Exception {
    
        // 1: 订单信息--插入丁订单系统,订单数据库事务
        int i = orderDataBaseService.saveOrder(orderInfo);
        // 2:通過Http接口发送订单信息到运单系统
        int id = orderInfo.getOrderId();
        System.out.println("id:"+id);
        String result = dispatchHttpApi(id);
        if(!"User added successfully".equals(result)) {
    
            throw new Exception("订单创建失败,原因是运单接口调用失败!");
        }
    }
    /**
     *  模拟http请求接口发送,运单系统,将订单号传过去 springcloud
     * @return
     */
    private String dispatchHttpApi(int orderId) {
    
        /**
         * 情况1: 关闭远程服务:ConnectException: Connection refused: connect
         *
         */
        SimpleClientHttpRequestFactory factory  = new SimpleClientHttpRequestFactory();
        // 链接超时 > 3秒
        factory.setConnectTimeout(3000);
        // 处理超时 > 2秒
        factory.setReadTimeout(2000);
        // 发送http请求
        String url = "http://localhost:8081/dispatcher/order?orderId="+orderId;
        RestTemplate restTemplate = new RestTemplate(factory);//异常
        String result = restTemplate.getForObject(url, String.class);
        return result;
    }
}
test:    OrderServiceApplicationTests
@SpringBootTest
class OrderServiceApplicationTests {
    
    @Autowired
    private OrderService orderService;
    @Test
    void actionTest1() throws Exception {
    
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String dateNow = sdf.format(new Date());
        orderService.createOrder(new Order(0,0,UUID.randomUUID().toString(),dateNow));
    }
}

配送中心

系统结构

dispacher-service

   mapper:    DispacherDao
   pojo:    Dispacher
   service:    DispacherService
   web:    DispacherController

sql文件

CREATE TABLE `dispacher_service` (
  `dispacher_id` int(50) DEFAULT NULL,
  `order_id` int(50) DEFAULT NULL,
  `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  `create_time` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  `user_id` int(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8
_bin;
 mapper:    DispacherDao
@Repository
public interface DispatcherDao {
    
//    @Insert("insert into dispacher_service values(#{dispacherId},#{orderId},#{orderContent},#{createTime},#{userId})")
    int insertUser(Dispacher dispacher);
}
pojo:    Dispacher
@Lombok
public class Dispacher  {
    
    private int dispacherId;
    private int orderId;
    private String orderContent;
    private String createTime;
    private int userId;
}
  service:    DispacherService
@Service
//@Transactional(rollbackFor = Exception.class)
public class DispatcherService {
    
    @Autowired
    private DispatcherDao dispatcherDao;
    public boolean dispatcher(int orderId) {
    
        Dispacher dispacher = new Dispacher();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
        String dateNow = sdf.format(new Date());
        dispacher.setCreateTime(dateNow);
        dispacher.setOrderId(orderId);
        dispacher.setDispacherId(orderId);
        dispacher.setOrderContent(UUID.randomUUID().toString());
        dispacher.setUserId(orderId);
        if(dispacher==null){
    
            return false;
        }else {
    
            int i = dispatcherDao.insertUser(dispacher);
            System.out.println("影响行数:"+i);
        }
        return true;
    }
}
web:    DispacherController
@RestController
@RequestMapping("/dispatcher")
public class DispatcherController {
    
    @Autowired
    DispatcherService dispatcherService;
    @Transactional(rollbackFor = Exception.class) // 订单创建整个方法添加事务
    @GetMapping("/order")
    public String lock(int orderId) throws InterruptedException {
    
        boolean dispatcher = dispatcherService.dispatcher(orderId);
        System.out.println("result:"+dispatcher);
        if(dispatcher){
    
            return "User added successfully";
        }
        return "Failed to add user";
    }
}

基于MQ的分布式事务消息的可靠生产问题

整体设计思路
在这里插入图片描述
存在的问题:

过程: 用户下单->保存到数据库->派发订单信息->保存订单信息

描述: 上面使用事务回滚导致的信息数据两个模块信息不能保持一致,即一个成功一个失败,带给用户的体验非常的差劲

解决方案:

使用mq解决

过程:用户下单->保存到数据库->新增派单信息到数据库->发送下单数据到mq保存->新增发送数据到mq的冗余 表,状态记录,->根据mq中的ack获取是否发送到mq,来决定冗余表里面的状态
在这里插入图片描述
以下代码基于以上回滚问题结构

sql脚本

CREATE TABLE `order_service_message` (
  `order_id` int(50) DEFAULT NULL,
  `status` int(50) DEFAULT NULL,
  `order_content` varchar(50) COLLATE utf8_bin DEFAULT NULL,
  `unique_id` int(50) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

pom.xml

 springboot+rabbitmq+mybaits+mysql+fastjson+test

properties

server.port=8082
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://localhost:3306/cn_tedu_order?characterEncoding=utf8&useSSL=false&serverTimezone=Asia/Shanghai&rewriteBatchedStatements=true
spring.datasource.username=root
spring.datasource.password=root
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.publisher-confirm-type=correlated
mybatis.mapper-locations=classpath:mapper/*.xml
mybatis.type-aliases-package=cn.tedu.orderservice.pojo
# Mybatis显示sql语句输出的配置
logging.level.cn.tedu.mybatis.mapper=TRACE

entity: OrderDataBaeService

@Service
public class OrderDataBaseService {
    
    @Autowired
    private OrderMapper orderMapper;
    public int saveOrder(Order orderInfo) throws Exception {
    
        int i1 = orderMapper.saveOrder(orderInfo);
        orderMapper.saveOrderMessage(
                new MessageOrder(orderInfo.getOrderId(),0,orderInfo.getOrderContent(),orderInfo.getUserId())
        );
        if(i1!=1){
    
            throw new Exception("Filed DatabasesError Action");
        }
        return i1;
    }
    public int updateStatus(int orderId) throws Exception {
    
        int i3 = orderMapper.updateStatus(orderId);
        if(i3!=1){
    
            throw new Exception("The Search Data Is Empty");
        }
        return i3;
    }
}

mapper: OrderMappe

@Repository
public interface OrderMapper {
    
    @Insert("insert into order_service values(#{orderId},#{userId},#{orderContent},#{createTime})")
    int saveOrder(Order order);
    @Insert("insert into order_service_message values(#{orderId},#{status},#{orderContent},#{uniqueId})")
    int saveOrderMessage(MessageOrder order);
    @Update("update order_service_message set status = 1 where order_id=#{orderId}")
    int updateStatus(int orderId);
}

pojo: MessageOrder

@Lombok
public class MessageOrder implements Serializable {
    
    private int orderId;
    private int status;
    private String orderContent;
    private int uniqueId;
    }
Order
@Lombok
public class Order implements Serializable {
    
    private int orderId;
    private int userId;
    private String orderContent;
    private String createTime;
    }

service: OrderService

@Service
public class OrderService {
    
    @Autowired
    private OrderDataBaseService orderDataBaseService;
    public void createOrder(Order orderInfo) throws Exception {
    
        // 1: 订单信息--插入丁订单系统,订单数据库事务
        int i = orderDataBaseService.saveOrder(orderInfo);
        if(i!=1){
    
            throw new Exception("添加用户失败");
        }
        // 2:通過Http接口发送订单信息到运单系统
        int id = orderInfo.getOrderId();
        System.out.println("id:"+id);
        String result = dispatchHttpApi(id);
        if(!"User added successfully".equals(result)) {
    
            throw new Exception("订单创建失败,原因是运单接口调用失败!");
        }
    }
    /**
     *  模拟http请求接口发送,运单系统,将订单号传过去 springcloud
     * @return
     */
    private String dispatchHttpApi(int orderId) {
    
        /**
         * 情况1: 关闭远程服务:ConnectException: Connection refused: connect
         *
         */
        SimpleClientHttpRequestFactory factory  = new SimpleClientHttpRequestFactory();
        // 链接超时 > 3秒
        factory.setConnectTimeout(3000);
        // 处理超时 > 2秒
        factory.setReadTimeout(2000);
        // 发送http请求
        String url = "http://localhost:8081/dispatcher/order?orderId="+orderId;
        RestTemplate restTemplate = new RestTemplate(factory);//异常
        String result = restTemplate.getForObject(url, String.class);
        return result;
    }
}
 TestOrderServcie
@Service
public class TestOrderService {
    
    @Autowired
    OrderMapper orderMapper;
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private OrderDataBaseService orderDataBaseService;
    //构造函数执行完成之后执行的方法,init
    @PostConstruct
    public void regCallback(){
    
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    
                System.out.println("cause:"+cause);
                String id = correlationData.getId();
                int orderId = Integer.parseInt(id);
                //如果ack为true表示消息已收到
                if(!ack){
    
                    System.out.println("下单信息投递到mq失败");
                    return;
                }
                try {
    
                    // 1: 订单信息--插入丁订单系统,订单数据库事务
                    int i = orderDataBaseService.updateStatus(orderId);
                    if(i==1){
    
                        System.out.println("修改状态成功,成功投递到mq");
                    }
                }catch (Exception e){
    
                    System.out.println("本地消息状态修改异常");
                }
            }
        });
    }
    //
    public void sendMessage(Order orderInfo){
    
        String userJson = JSON.toJSONString(orderInfo);
        int orderId = orderInfo.getOrderId();
        String s = String.valueOf(orderId);
        rabbitTemplate.convertAndSend("save-order-exchange","",userJson,new CorrelationData(s));
    }
}

基于MQ的分布式事务消息的可靠消费
可靠消费会出现的问题:

消费失败,触发mq循环重试的机制

新增消费者的代码

@Component
public class OrderConsumer {
    
    private DispatcherService dispatcherService;
    private int count = 1;
    @RabbitListener(queues = "save-order-queue")
    public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
            System.out.println("收到mq的消息是:"+orderMsg+",count = "+count++);
            ;
            Order order = JSONObject.parseObject(orderMsg,Order.class);
            int orderId = order.getOrderId();
            boolean dispatcher = dispatcherService.dispatcher(orderId);
            if(dispatcher){
    
                System.out.println("消费者:ok");
            }else {
    
                System.out.println("消费者error");
            }
    }
}

解决方案:

1、控制消费常识获取次数
在这里插入图片描述
新增代码

#手动ack开启
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#开启重试
spring.rabbitmq.listener.simple.retry.enabled=true
#最大重试次数
spring.rabbitmq.listener.simple.retry.max-attempts= 3
#间隔时间
spring.rabbitmq.listener.simple.retry.initial-interval=2000ms

测试…

2、手动获取ack,加上try/catch
在这里插入图片描述
变动的代码

@Component
public class OrderConsumer {
    
    private DispatcherService dispatcherService;
    private int count = 1;
    @RabbitListener(queues = "save-order-queue")
    public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
        try {
    
            System.out.println("收到mq的消息是:"+orderMsg+",count = "+count++);
            ;
            Order order = JSONObject.parseObject(orderMsg,Order.class);
            int orderId = order.getOrderId();
            boolean dispatcher = dispatcherService.dispatcher(orderId);
            if(dispatcher){
    
                System.out.println("消费者:ok");
            }else {
    
                System.out.println("消费者error");
            }
            channel.basicAck(tag,false);
        } catch (Exception e) {
    
            /**
             * tag      消息的tag
             * false    不会重发,会把消息打入到死信队列
             * requeue  true会死循环的重发,建议如果使用true的话,不要try/catch,否则照成死循环
             */
            channel.basicNack(tag,false,false);
        }
    }
}

基于重试的配置文件,添加以上代码测试…

3、try/catch+手动获取ack+死信队列
在这里插入图片描述
根据以上两种情况配置,添加如下配置完成分布式事务数据一致问题解决

//死信队列管理
@Configuration
public class OrderProducer {
    
    @Bean
    public Queue deadQueue(){
    
        return new Queue("dead-queue",true);
    }
    @Bean
    public DirectExchange deadExchange() {
    
        return new DirectExchange("dead-exchange", true, false);
    }
    @Bean
    public Binding deadBindingExchange() {
    
        return BindingBuilder.bind(deadQueue()).to(deadExchange()).with("");
    }
    @Bean
    public Queue orderQueue(){
    
        Map<String,Object> args = new HashMap<>();
        args.put("x-dead-letter-exchange","dead-exchange");
        return new Queue("save-order-queue",true,false,false,args);
    }
    @Bean
    public DirectExchange orderExchange() {
    
        return new DirectExchange("save-order-exchange", true, false);
    }
    @Bean
    public Binding orderBingExchange() {
    
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with("");
    }
}

以下可以不用添加测试

@Component
public class DeadConsumer {
    
    private DispatcherService dispatcherService;
    private int count = 1;
    @RabbitListener(queues = "save-order-queue")
    public void messageConsumer(String orderMsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
    
            System.out.println("收到mq的消息是:"+orderMsg+",count = "+count++);
            Order order = JSONObject.parseObject(orderMsg,Order.class);
            //编写将mq对象保存到数据库进行人工检查数据完整性~~~
    }
}

秒杀实战

实现逻辑:

 1. 生产端发送消息,传递,谁秒杀了什么
 2. 获取消息,将消息中split出来的数据作为减去库存的条件
 3. 封装数据写入到秒杀成功的数据库表中

前置条件

环境搭建

​ springboot+mybaits+redis+rabbitmq+mysql+springbootest

properties

server.port=10007
#datasource
spring.datasource.url=jdbc:mysql:///seckill
spring.datasource.password=root
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.jdbc.Driver
#mybatis
mybatis.configuration.map-underscore-to-camel-case=true
mybatis.type-aliases-package=com.jt.common.pojo
mybatis.mapper-locations=classpath:/mappers/*.xml
#微服务配置
spring.application.name=seckill-service
eureka.client.service-url.defaultZone=http://localhost:8761/eureka/
#rabbitmq
spring.rabbitmq.host=10.9.182.139
#redis
spring.redis.cluster.nodes=10.9.182.139:8000

sql脚本

/*
SQLyog Ultimate - MySQL GUI v8.2 
MySQL - 5.5.27 : Database - seckill
*********************************************************************
*/
/*!40101 SET NAMES utf8 */;
/*!40101 SET SQL_MODE=''*/;
/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;
/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;
/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
CREATE DATABASE /*!32312 IF NOT EXISTS*/`seckill` /*!40100 DEFAULT CHARACTER SET utf8 */;
USE `seckill`;
/*Table structure for table `seckill` */
DROP TABLE IF EXISTS `seckill`;
CREATE TABLE `seckill` (
  `seckill_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '商品库存id',
  `name` varchar(120) NOT NULL COMMENT '商品名称',
  `number` int(11) NOT NULL COMMENT '库存数量',
  `initial_price` bigint(20) NOT NULL COMMENT '原价',
  `seckill_price` bigint(20) NOT NULL COMMENT '秒杀价',
  `sell_point` varchar(500) NOT NULL COMMENT '卖点',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '秒杀创建时间',
  `start_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '秒杀开始时间',
  `end_time` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '秒杀结束时间',
  PRIMARY KEY (`seckill_id`),
  KEY `idx_create_time` (`create_time`),
  KEY `idx_start_time` (`start_time`),
  KEY `idx_end_time` (`end_time`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8 COMMENT='秒杀库存表';
/*Data for the table `seckill` */
insert  into `seckill`(`seckill_id`,`name`,`number`,`initial_price`,`seckill_price`,`sell_point`,`create_time`,`start_time`,`end_time`) values (1,'oppo10',719,2000,1000,'1000元成功秒杀oppo10','2018-05-17 11:12:49','2019-05-09 13:13:49','2020-05-18 00:00:00'),(2,'荣耀8',80,1800,800,'800元成功秒杀荣耀8','2018-01-21 22:08:49','2018-01-23 00:00:00','2018-01-24 00:00:00'),(3,'iPhone6',60,1600,600,'600元成功秒杀iPhone6','2018-01-21 22:08:49','2018-01-24 00:00:00','2018-01-25 00:00:00'),(4,'小米4',40,1400,400,'400元成功秒杀小米4','2018-01-21 22:08:49','2018-01-25 00:00:00','2018-01-26 00:00:00'),(5,'vivo2',20,1200,200,'200元成功秒杀vivo2','2018-01-21 22:08:49','2018-01-26 00:00:00','2018-01-27 00:00:00'),(6,'魅族1',10,1000,100,'100元成功秒杀魅族1','2018-01-21 22:08:49','2018-01-27 00:00:00','2018-01-28 00:00:00');
/*Table structure for table `success` */
DROP TABLE IF EXISTS `success`;
CREATE TABLE `success` (
  `success_id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '秒杀成功id',
  `seckill_id` bigint(20) NOT NULL COMMENT '秒杀商品id',
  `user_phone` bigint(20) NOT NULL COMMENT '用户手机号',
  `state` tinyint(4) NOT NULL DEFAULT '-1' COMMENT '状态标志:-1:无效;0:成功;1:已付款;2:已发货',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '秒杀成功创建时间',
  PRIMARY KEY (`success_id`),
  KEY `idx_create_time` (`create_time`)
) ENGINE=InnoDB AUTO_INCREMENT=6382 DEFAULT CHARSET=utf8 COMMENT='秒杀成功明细表';
/*Data for the table `success` */
/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;
/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;
/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;
/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

前端页面

<!DOCTYPE html>
<html>
<!-- jQuery文件。务必在bootstrap.min.js 之前引入 -->
<script src="/js/jquery.min.js"></script>
<!-- 最新的 Bootstrap 核心 JavaScript 文件 -->
<script src="/js/bootstrap.min.js"></script>
<meta charset="UTF-8">
<head>
    <title>商品详情页</title>
    <style>
        .disappearBtn{
      
            display:none;
        }
        .showBtn{
      
            display:block;
        }
    </style>
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <!-- 引入 Bootstrap -->
    <link
            href="/css/seckill.css"
            rel="stylesheet">
    <script type="text/javascript">
        //重复执行success方法;生产环境下不建议使用,很耗费性能
        var seckillId="";
        $(function() {
      
            var localUrl= window.location.href;
            var args=localUrl.substr(localUrl.lastIndexOf("?")+1);
            if(!args.match("^seckillId=[0-9]*$")){
      
                alert("你的参数有问题,按默认查询seckill=1")
                args="seckillId=1";
            }
            alert(args);
            queryDetail(args);
            /*$("#seckill-btn").click(function() {
                $.ajax({
                    url: '/seckill/${seckill.seckillId}',
                    type: 'GET',
                    dataType: 'text',
                    success: function(result) {
                        alert(result);
                        console.log(result)
                    }
                });
            });*/
        });
        function startSeckill(){
      
            $.ajax({
      
                url:"http://www.easymall.com/seckills/"+seckillId,
                dataType:"json",
                type:"GET",
                success:function(data){
      
                    if(data.status==200){
      
                        alert("秒杀成功");
                    }else if(data.status=202){
      
                        alert("兄弟你没有登录吧,那我哪知道你是谁?");
                        window.location.href="./login.html";
                    }else{
      
                        alert("秒杀失败,原因未知,算你倒霉");
                        windwo.location.href="./seckill-list.html";
                    }
            },
                error:function(){
      
                    alert("发送失败");
                }
            });
        }
        function queryDetail(args){
      
            $.ajax({
      
                url:"http://www.easymall.com/seckills/detail?"+args,
                dataType:"json",
                type:"GET",
                success:function (data) {
      
                    if(data!=null){
      
                        $("#seckill_content").append("<h1>"+data.name+"<small>(秒杀价"+data.seckillPrice+"元)</small></h1>");
                        var nowTime=new Date().getTime();
                       /* alert(data.startTime);
                        alert(data.endTime);
                        alert(nowTime);*/
                        seckillId=data.seckillId;
                        countdown(nowTime,data.startTime,data.endTime);
                    }else{
      
                        alert("数据有问题");
                    }
                },
                error:function () {
      
                    alert("发送失败");
                }
            })
        }
        function countdown(nowTime,startTime,endTime){
      
            /*alert("开始计算");*/
            var countdownBtn = $('#countdown-btn');
            if(nowTime>endTime){
      
                countdownBtn.html('秒杀结束');
            }else if(nowTime<startTime){
      
                var killTime = new Date(startTime);
                countdownBtn.countdown(killTime,function(event){
      
                    var format = event.strftime('秒杀倒计时:%D天 %H时 %M分 %S秒');
                    countdownBtn.html(format);
                }).on('finish.countdown',function(){
      
                    //倒计时结束后回调事件
                    $('#countdown-div').addClass('disappearBtn');
                    $('#seckill-div').addClass('showBtn');
                })
            }else{
      
                //执行秒杀
                $('#countdown-div').addClass('disappearBtn');
                $('#seckill-div').addClass('showBtn');
            }
        }
        function success(sekillId){
      
            $.ajax({
      
                url: "http://www.easymall.com/seckills/"+seckillId+"/userPhone",
                type: 'GET',
                dataType: 'json',
                success: function(result) {
      
                    //console.log(result);
                    var i=0;
                    var str ="";
                    for(i=0;i<result.length;i++){
      
                        str += "非常感谢您参与本次秒杀活动,恭喜手机号为"+result[i]+"的幸运用户${seckill.sellPoint}<br/>";
                    }
                    document.getElementById("showSuccess").innerHTML=str;
                }
            });
        }
        window.setInterval("success(seckillId)",1000);
    </script>
</head>
<body>
<div class="container">
    <div class="panel panel-default text-center">
        <div id="seckill_content" class="pannel-heading">
        </div>
        <div class="panel-body"  id="countdown-div">
            <button type="button" class="btn  btn-lg btn-block btn-danger" id="countdown-btn"></button>
        </div>
        <div class="panel-body disappearBtn"  id="seckill-div">
            <button type="button" class="btn btn-primary btn-lg btn-block btn-info" id="seckill-btn" onclick="startSeckill()">开始进入商品秒杀环节</button>
        </div>
        <div id="showSuccess"></div>
    </div>
</div>
</body>
<!-- jQuery countDown倒计时插件 -->
<script src="/js/jquery.countdown.js"></script>
<!-- 计时交互逻辑 -->
<script>
</script>
</html>

js效果展示

function success(sekillId){
    
    $.ajax({
    
        url: "http://www.easymall.com/seckills/"+seckillId+"/userPhone",
        type: 'GET',
        dataType: 'json',
        success: function(result) {
    
            //console.log(result);
            var i=0;
            var str ="";
            for(i=0;i<result.length;i++){
    
                str += "非常感谢您参与本次秒杀活动,恭喜手机号为"+result[i]+"的幸运用户${seckill.sellPoint}<br/>";
            }
            document.getElementById("showSuccess").innerHTML=str;
        }
    });}

接口文档

js请求地址    http://www.easymall/seckills/{seckillId}
后台接收    /seckill/manage/{seckillId}
请求方式    Get
请求参数    Long seckillId 路径传参
返回数据    SysResult的返回对象
Integer status 200表示秒杀成功
String msg:ok表示成功
Object data:其他数据
备注    根据cookie获取用户信息,拼接用户数据到消息中,绑定一个秒杀的商品

后台代码

consumer

@Component
public class SeckillConsumer {
    
    /*
        创建秒杀中,消费端的消费逻辑代码
        1.参数就是消息msg userPhone+"/"+seckillId
        解析字符串 userPhone seckillId
        2.利用seckillId对该商品实现减库存操作,mybatis,insert update
        这种写操作会直接将 1/0 rows affected封装到返回结果
        条件: seckillId >0 时间限制 当前系统时间必须 大于start 小于
        end
        3.判断成功失败,记录日志,记录数据,success对象封装写入数据库
        为后续逻辑做准备
     */
    @Autowired(required = false)
    private SeckillMapper seckillMapper;
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RabbitListener(queues = "seckill_q")
    public void seckill(String msg){
    
        //解析
        Long userPhone=Long.parseLong(msg.split("/")[0]);
        Long seckillId=Long.parseLong(msg.split("/")[1]);
        //执行数据库减库存操作
        Date nowTime=new Date();
        //在消费端真正减库存之前,先到redis执行incr命令
        String seckillKey="seckill_"+seckillId;
        Integer number = NumberUtils.NUMBER.get(seckillKey);
        Long increment = redisTemplate.opsForValue()
                .increment(seckillKey);
        if(increment>number){
    
            //已经从redis减库存完事了
            System.out.println(
                    "该用户:"
                    +userPhone
                    +"秒杀商品:"
                    +seckillId
                    +"失败,卖完了");
            return;
        }
        int result
                =seckillMapper.updateNumber(seckillId,nowTime);
        if(result==0){
    
            System.out.println(
                    "该用户:"
                    +userPhone
                    +"秒杀商品:"
                    +seckillId
                    +"失败,卖完了");
            return;
        }
        //成功减库存 封装数据,写入数据库success表格
        Success suc=new Success();
        suc.setCreateTime(nowTime);
        suc.setSeckillId(seckillId);
        suc.setUserPhone(userPhone);
        suc.setState(0);
        seckillMapper.insertSuccess(suc);
    }
}

controller

@RestController
@RequestMapping("/seckill/manage")
public class SeckillController {
    
    /**
     * 查询所有秒杀商品list列表
     * 请求地址:/list
     * 请求参数:null
     * 返回数据:List<Seckill>
     */
    @Autowired(required = false)
    private SeckillMapper seckillMapper;
    @RequestMapping("/list")
    public List<Seckill> list(){
    
        return seckillMapper.selectSeckills();
    }
    /**
     * 根据list列表中展示的商品,点击详情
     * 请求地址:/detail
     * 请求参数:Long seckillId
     * 返回数据:Seckill
     */
    @RequestMapping("/detail")
    public Seckill detail(Long seckillId){
    
        return seckillMapper.selectSeckillById(seckillId);
    }
    /**
     * 发起秒杀请求
     * 请求地址:/{sekcillId}
     * 请求参数:路径参数
     * 返回数据:sysResult 200
     */
    @Autowired
    private RabbitTemplate template;
    @Autowired
    private StringRedisTemplate redisTemplate;
    @RequestMapping("/{seckillId}")
    public SysResult startSeckill(@PathVariable String seckillId){
    
        //生产端发送消息,传递,谁秒杀了什么
        String userPhone="1330668"+(new Random().nextInt(9000)+1000);
        //模拟每次访问的人不一样,有可能一样
        String msg=userPhone+"/"+seckillId;
        /*限制一人秒杀商品多次
            使用msg 作为redis的key值,判断key值是否在redis存在
            决定是否限制本次请求秒杀向后发送消息
         */
        Long increment = redisTemplate.opsForValue().increment(msg);
        if(increment>1){
    
            //第二次以上相同用户秒杀同一个商品了
            return SysResult.build(201,"占便宜没够",null);
        }
        //将其发送给seckill_q
        template.convertAndSend(
                "seckill_ex",
                "seckill",
                msg);
        return SysResult.ok();
    }
    /*
        展示成功者信息
        请求地址:/{seckillId}/userPhone
        请求参数:路径参数Long seckillId
        返回数据:List<String> 包含了所有的成功者电话
     */
    @RequestMapping("/{seckillId}/userPhone")
    public List<String> successList(@PathVariable Long seckillId){
    
        return seckillMapper.selectUserphonesById(seckillId);
    }
}

mapper

public interface SeckillMapper {
    
    List<Seckill> selectSeckills();
    Seckill selectSeckillById(Long seckillId);
    int updateNumber(@Param("seckillId") Long seckillId,
                     @Param("nowTime")Date nowTime);
    void insertSuccess(Success suc);
    List<String> selectUserphonesById(Long seckillId);
}

mapper.xml

<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="cn.tedu.seckill.mapper.SeckillMapper">
    <!--查询秒杀商品列表-->
    <select id="selectSeckills" resultType="Seckill">
        select * from seckill;
    </select>
    <!--利用id查询单个商品-->
    <select id="selectSeckillById" resultType="Seckill">
        select * from seckill where seckill_id=#{seckillId};
    </select>
    <!--更新库存 减库存-->
    <update id="updateNumber">
        update seckill set number=number-1 where
        seckill_id =#{seckillId} AND
        number &gt; 0 and
        #{nowTime} &gt; start_time and
        #{nowTime} &lt; end_time;
    </update>
    <!--新增入库-->
    <insert id="insertSuccess">
        insert into success (seckill_id,user_phone,create_time,state)
        values (#{seckillId},#{userPhone},#{createTime},#{state});
    </insert>
    <!--展示成功信息-->
    <select id="selectUserphonesById" resultType="String">
        select user_phone from success
        where seckill_id=#{seckillId};
    </select>
</mapper>
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/u012386080/article/details/120172355

智能推荐

51单片机的中断系统_51单片机中断篇-程序员宅基地

文章浏览阅读3.3k次,点赞7次,收藏39次。CPU 执行现行程序的过程中,出现某些急需处理的异常情况或特殊请求,CPU暂时中止现行程序,而转去对异常情况或特殊请求进行处理,处理完毕后再返回现行程序断点处,继续执行原程序。void 函数名(void) interrupt n using m {中断函数内容 //尽量精简 }编译器会把该函数转化为中断函数,表示中断源编号为n,中断源对应一个中断入口地址,而中断入口地址的内容为跳转指令,转入本函数。using m用于指定本函数内部使用的工作寄存器组,m取值为0~3。该修饰符可省略,由编译器自动分配。_51单片机中断篇

oracle项目经验求职,网络工程师简历中的项目经验怎么写-程序员宅基地

文章浏览阅读396次。项目经验(案例一)项目时间:2009-10 - 2009-12项目名称:中驰别克信息化管理整改完善项目描述:项目介绍一,建立中驰别克硬件档案(PC,服务器,网络设备,办公设备等)二,建立中驰别克软件档案(每台PC安装的软件,财务,HR,OA,专用系统等)三,能过建立的档案对中驰别克信息化办公环境优化(合理使用ADSL宽带资源,对域进行调整,对文件服务器进行优化,对共享打印机进行调整)四,优化完成后..._网络工程师项目经历

LVS四层负载均衡集群-程序员宅基地

文章浏览阅读1k次,点赞31次,收藏30次。LVS:Linux Virtual Server,负载调度器,内核集成, 阿里的四层SLB(Server Load Balance)是基于LVS+keepalived实现。NATTUNDR优点端口转换WAN性能最好缺点性能瓶颈服务器支持隧道模式不支持跨网段真实服务器要求anyTunneling支持网络private(私网)LAN/WAN(私网/公网)LAN(私网)真实服务器数量High (100)High (100)真实服务器网关lvs内网地址。

「技术综述」一文道尽传统图像降噪方法_噪声很大的图片可以降噪吗-程序员宅基地

文章浏览阅读899次。https://www.toutiao.com/a6713171323893318151/作者 | 黄小邪/言有三编辑 | 黄小邪/言有三图像预处理算法的好坏直接关系到后续图像处理的效果,如图像分割、目标识别、边缘提取等,为了获取高质量的数字图像,很多时候都需要对图像进行降噪处理,尽可能的保持原始信息完整性(即主要特征)的同时,又能够去除信号中无用的信息。并且,降噪还引出了一..._噪声很大的图片可以降噪吗

Effective Java 【对于所有对象都通用的方法】第13条 谨慎地覆盖clone_为继承设计类有两种选择,但无论选择其中的-程序员宅基地

文章浏览阅读152次。目录谨慎地覆盖cloneCloneable接口并没有包含任何方法,那么它到底有什么作用呢?Object类中的clone()方法如何重写好一个clone()方法1.对于数组类型我可以采用clone()方法的递归2.如果对象是非数组,建议提供拷贝构造器(copy constructor)或者拷贝工厂(copy factory)3.如果为线程安全的类重写clone()方法4.如果为需要被继承的类重写clone()方法总结谨慎地覆盖cloneCloneable接口地目的是作为对象的一个mixin接口(详见第20_为继承设计类有两种选择,但无论选择其中的

毕业设计 基于协同过滤的电影推荐系统-程序员宅基地

文章浏览阅读958次,点赞21次,收藏24次。今天学长向大家分享一个毕业设计项目基于协同过滤的电影推荐系统项目运行效果:项目获取:https://gitee.com/assistant-a/project-sharing21世纪是信息化时代,随着信息技术和网络技术的发展,信息化已经渗透到人们日常生活的各个方面,人们可以随时随地浏览到海量信息,但是这些大量信息千差万别,需要费事费力的筛选、甄别自己喜欢或者感兴趣的数据。对网络电影服务来说,需要用到优秀的协同过滤推荐功能去辅助整个系统。系统基于Python技术,使用UML建模,采用Django框架组合进行设

随便推点

你想要的10G SFP+光模块大全都在这里-程序员宅基地

文章浏览阅读614次。10G SFP+光模块被广泛应用于10G以太网中,在下一代移动网络、固定接入网、城域网、以及数据中心等领域非常常见。下面易天光通信(ETU-LINK)就为大家一一盘点下10G SFP+光模块都有哪些吧。一、10G SFP+双纤光模块10G SFP+双纤光模块是一种常规的光模块,有两个LC光纤接口,传输距离最远可达100公里,常用的10G SFP+双纤光模块有10G SFP+ SR、10G SFP+ LR,其中10G SFP+ SR的传输距离为300米,10G SFP+ LR的传输距离为10公里。_10g sfp+

计算机毕业设计Node.js+Vue基于Web美食网站设计(程序+源码+LW+部署)_基于vue美食网站源码-程序员宅基地

文章浏览阅读239次。该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流项目运行环境配置:项目技术:Express框架 + Node.js+ Vue 等等组成,B/S模式 +Vscode管理+前后端分离等等。环境需要1.运行环境:最好是Nodejs最新版,我们在这个版本上开发的。其他版本理论上也可以。2.开发环境:Vscode或HbuilderX都可以。推荐HbuilderX;3.mysql环境:建议是用5.7版本均可4.硬件环境:windows 7/8/10 1G内存以上;_基于vue美食网站源码

oldwain随便写@hexun-程序员宅基地

文章浏览阅读62次。oldwain随便写@hexun链接:http://oldwain.blog.hexun.com/ ...

渗透测试-SQL注入-SQLMap工具_sqlmap拖库-程序员宅基地

文章浏览阅读843次,点赞16次,收藏22次。用这个工具扫描其它网站时,要注意法律问题,同时也比较慢,所以我们以之前写的登录页面为例子扫描。_sqlmap拖库

origin三图合一_神教程:Origin也能玩转图片拼接组合排版-程序员宅基地

文章浏览阅读1.5w次,点赞5次,收藏38次。Origin也能玩转图片的拼接组合排版谭编(华南师范大学学报编辑部,广州 510631)通常,我们利用Origin软件能非常快捷地绘制出一张单独的绘图。但是,我们在论文的撰写过程中,经常需要将多种科学实验图片(电镜图、示意图、曲线图等)组合在一张图片中。大多数人都是采用PPT、Adobe Illustrator、CorelDraw等软件对多种不同类型的图进行拼接的。那么,利用Origin软件能否实..._origin怎么把三个图做到一张图上

51单片机智能电风扇控制系统proteus仿真设计( 仿真+程序+原理图+报告+讲解视频)_电风扇模拟控制系统设计-程序员宅基地

文章浏览阅读4.2k次,点赞4次,收藏51次。51单片机智能电风扇控制系统仿真设计( proteus仿真+程序+原理图+报告+讲解视频)仿真图proteus7.8及以上 程序编译器:keil 4/keil 5 编程语言:C语言 设计编号:S0042。_电风扇模拟控制系统设计