RocketMQ一個新的消費組初次啟動時從何處開始消費呢?

目錄

@(本文目錄)

1、拋出問題

一個新的消費組訂閱一個已存在的Topic主題時,消費組是從該Topic的哪條消息開始消費呢?

首先翻閱DefaultMQPushConsumer的API時,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼帘,從字面意思來看是設置消費者從哪裡開始消費,正是解開該問題的”鑰匙“。ConsumeFromWhere枚舉類圖如下:

  • CONSUME_FROM_MAX_OFFSET
    從消費隊列最大的偏移量開始消費。
  • CONSUME_FROM_FIRST_OFFSET
    從消費隊列最小偏移量開始消費。
  • CONSUME_FROM_TIMESTAMP
    從指定的時間戳開始消費,默認為消費者啟動之前的30分鐘處開始消費。可以通過DefaultMQPushConsumer#setConsumeTimestamp。

是不是點小激動,還不快試試。

需求:新的消費組啟動時,從隊列最後開始消費,即只消費啟動后發送到消息服務器后的最新消息。

1.1 環境準備

本示例所用到的Topic路由信息如下:

Broker的配置如下(broker.conf)

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store
storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store/commitlog
namesrvAddr=127.0.0.1:9876
autoCreateTopicEnable=false
mapedFileSizeCommitLog=10240
mapedFileSizeConsumeQueue=2000

其中重點修改了如下兩個參數:

  • mapedFileSizeCommitLog
    單個commitlog文件的大小,這裏使用10M,方便測試用。
  • mapedFileSizeConsumeQueue
    單個consumequeue隊列長度,這裏使用1000,表示一個consumequeue文件中包含1000個條目。

1.2 消息發送者代碼

public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 300; i++) {
        try {
            Message msg = new Message("TopicTest" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}

通過上述,往TopicTest發送300條消息,發送完畢后,RocketMQ Broker存儲結構如下:

1.3 消費端驗證代碼

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_01");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

執行上述代碼后,按照期望,應該是不會消費任何消息,只有等生產者再發送消息后,才會對消息進行消費,事實是這樣嗎?執行效果如圖所示:

令人意外的是,竟然從隊列的最小偏移量開始消費了,這就“尷尬”了。難不成是RocketMQ的Bug。帶着這個疑問,從源碼的角度嘗試來解讀該問題,並指導我們實踐。

2、探究CONSUME_FROM_MAX_OFFSET實現原理

對於一個新的消費組,無論是集群模式還是廣播模式都不會存儲該消費組的消費進度,可以理解為-1,此時就需要根據DefaultMQPushConsumer#consumeFromWhere屬性來決定其從何處開始消費,首先我們需要找到其對應的處理入口。我們知道,消息消費者從Broker服務器拉取消息時,需要進行消費隊列的負載,即RebalanceImpl。

溫馨提示:本文不會詳細介紹RocketMQ消息隊列負載、消息拉取、消息消費邏輯,只會展示出通往該問題的簡短流程,如想詳細了解消息消費具體細節,建議購買筆者出版的《RocketMQ技術內幕》書籍。

RebalancePushImpl#computePullFromWhere

public long computePullFromWhere(MessageQueue mq) {
        long result = -1;                                                                                                                                                                                                                  // @1
        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();    
        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
        switch (consumeFromWhere) {
            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
            case CONSUME_FROM_MIN_OFFSET:
            case CONSUME_FROM_MAX_OFFSET:
            case CONSUME_FROM_LAST_OFFSET: {                                                                                                                                                                // @2
               // 省略部分代碼
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {                                                                                                                                                              // @3
                // 省略部分代碼
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {                                                                                                                                                                  //@4
                // 省略部分代碼
                break;
            }
            default:
                break;
        }
        return result;                                                                                                                                                                                                                  // @5
    }

代碼@1:先解釋幾個局部變量。

  • result
    最終的返回結果,默認為-1。
  • consumeFromWhere
    消息消費者開始消費的策略,即CONSUME_FROM_LAST_OFFSET等。
  • offsetStore
    offset存儲器,消費組消息偏移量存儲實現器。

代碼@2:CONSUME_FROM_LAST_OFFSET(從隊列的最大偏移量開始消費)的處理邏輯,下文會詳細介紹。

代碼@3:CONSUME_FROM_FIRST_OFFSET(從隊列最小偏移量開始消費)的處理邏輯,下文會詳細介紹。

代碼@4:CONSUME_FROM_TIMESTAMP(從指定時間戳開始消費)的處理邏輯,下文會詳細介紹。

代碼@5:返回最後計算的偏移量,從該偏移量出開始消費。

2.1 CONSUME_FROM_LAST_OFFSET計算邏輯

case CONSUME_FROM_LAST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {                                                                                                             // @2
        result = lastOffset;
    }
    // First start,no offset
    else if (-1 == lastOffset) {                                                                                                  // @3
        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {               
            result = 0L;
        } else {
            try {
                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);                     
            } catch (MQClientException e) {                                                                              // @4
                result = -1;
            }
        }
    } else {
        result = -1;    
    }
    break;
}

代碼@1:使用offsetStore從消息消費進度文件中讀取消費消費進度,本文將以集群模式為例展開。稍後詳細分析。

代碼@2:如果返回的偏移量大於等於0,則直接使用該offset,這個也能理解,大於等於0,表示查詢到有效的消息消費進度,從該有效進度開始消費,但我們要特別留意lastOffset為0是什麼場景,因為返回0,並不會執行CONSUME_FROM_LAST_OFFSET(語義)。

代碼@3:如果lastOffset為-1,表示當前並未存儲其有效偏移量,可以理解為第一次消費,如果是消費組重試主題,從重試隊列偏移量為0開始消費;如果是普通主題,則從隊列當前的最大的有效偏移量開始消費,即CONSUME_FROM_LAST_OFFSET語義的實現。

代碼@4:如果從遠程服務拉取最大偏移量拉取異常或其他情況,則使用-1作為第一次拉取偏移量。

分析,上述執行的現象,雖然設置的是CONSUME_FROM_LAST_OFFSET,但現象是從隊列的第一條消息開始消費,根據上述源碼的分析,只有從消費組消費進度存儲文件中取到的消息偏移量為0時,才會從第一條消息開始消費,故接下來重點分析消息消費進度存儲器(OffsetStore)在什麼情況下會返回0。

接下來我們將以集群模式來查看一下消息消費進度的查詢邏輯,集群模式的消息進度存儲管理器實現為:
RemoteBrokerOffsetStore,最終Broker端的命令處理類為:ConsumerManageProcessor。

ConsumerManageProcessor#queryConsumerOffset
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response =
        RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
    final QueryConsumerOffsetResponseHeader responseHeader =
        (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
    final QueryConsumerOffsetRequestHeader requestHeader =
        (QueryConsumerOffsetRequestHeader) request
            .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

    long offset =
        this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());    // @1

    if (offset >= 0) {                                                                                                                                          // @2
        responseHeader.setOffset(offset);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
    } else {                                                                                                                                                       // @3
        long minOffset =
            this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
                requestHeader.getQueueId());                                                                                                     // @4
        if (minOffset <= 0
            && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(                                // @5
            requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
            responseHeader.setOffset(0L);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        } else {                                                                                                                                                 // @6
            response.setCode(ResponseCode.QUERY_NOT_FOUND);
            response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
        }
    }
    return response;
}

代碼@1:從消費消息進度文件中查詢消息消費進度。

代碼@2:如果消息消費進度文件中存儲該隊列的消息進度,其返回的offset必然會大於等於0,則直接返回該偏移量該客戶端,客戶端從該偏移量開始消費。

代碼@3:如果未從消息消費進度文件中查詢到其進度,offset為-1。則首先獲取該主題、消息隊列當前在Broker服務器中的最小偏移量(@4)。如果小於等於0(返回0則表示該隊列的文件還未曾刪除過)並且其最小偏移量對應的消息存儲在內存中而不是存在磁盤中,則返回偏移量0,這就意味着ConsumeFromWhere中定義的三種枚舉類型都不會生效,直接從0開始消費,到這裏就能解開其謎團了(@5)。

代碼@6:如果偏移量小於等於0,但其消息已經存儲在磁盤中,此時返回未找到,最終RebalancePushImpl#computePullFromWhere中得到的偏移量為-1。

看到這裏,大家應該能回答文章開頭處提到的問題了吧?

看到這裏,大家應該明白了,為什麼設置的CONSUME_FROM_LAST_OFFSET,但消費組是從消息隊列的開始處消費了吧,原因就是消息消費進度文件中並沒有找到其消息消費進度,並且該隊列在Broker端的最小偏移量為0,說的更直白點,consumequeue/topicName/queueNum的第一個消息消費隊列文件為00000000000000000000,並且消息其對應的消息緩存在Broker端的內存中(pageCache),其返回給消費端的偏移量為0,故會從0開始消費,而不是從隊列的最大偏移量處開始消費。

為了知識體系的完備性,我們順便來看一下其他兩種策略的計算邏輯。

2.2 CONSUME_FROM_FIRST_OFFSET

case CONSUME_FROM_FIRST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {    // @2
        result = lastOffset;
    } else if (-1 == lastOffset) {  // @3
        result = 0L;
    } else {                                  
        result = -1;                    // @4
    }
    break;
}

從隊列的開始偏移量開始消費,其計算邏輯如下:
代碼@1:首先通過偏移量存儲器查詢消費隊列的消費進度。

代碼@2:如果大於等於0,則從當前該偏移量開始消費。

代碼@3:如果遠程返回-1,表示並沒有存儲該隊列的消息消費進度,從0開始。

代碼@4:否則從-1開始消費。

2.4 CONSUME_FROM_TIMESTAMP

從指定時戳后的消息開始消費。

case CONSUME_FROM_TIMESTAMP: {
    ong lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {                                                                                                            // @2
        result = lastOffset;
    } else if (-1 == lastOffset) {                                                                                                 // @3
        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            try {
                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
            } catch (MQClientException e) {
                result = -1;
            }
        } else {
            try {
                long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                    UtilAll.YYYYMMDDHHMMSS).getTime();
                result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
            } catch (MQClientException e) {
                result = -1;
            }
        }
    } else {
        result = -1;
    }
    break;
}

其基本套路與CONSUME_FROM_LAST_OFFSET一樣:
代碼@1:首先通過偏移量存儲器查詢消費隊列的消費進度。

代碼@2:如果大於等於0,則從當前該偏移量開始消費。

代碼@3:如果遠程返回-1,表示並沒有存儲該隊列的消息消費進度,如果是重試主題,則從當前隊列的最大偏移量開始消費,如果是普通主題,則根據時間戳去Broker端查詢,根據查詢到的偏移量開始消費。

原理就介紹到這裏,下面根據上述理論對其進行驗證。

3、猜想與驗證

根據上述理論分析我們得知設置CONSUME_FROM_LAST_OFFSET但並不是從消息隊列的最大偏移量開始消費的“罪魁禍首”是因為消息消費隊列的最小偏移量為0,如果不為0,則就會符合預期,我們來驗證一下這個猜想。
首先我們刪除commitlog目錄下的文件,如圖所示:

其消費隊列截圖如下:

消費端的驗證代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_02");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

運行結果如下:

並沒有消息存在的消息,符合預期。

4、解決方案

如果在生產環境下,一個新的消費組訂閱一個已經存在比較久的topic,設置CONSUME_FROM_MAX_OFFSET是符合預期的,即該主題的consumequeue/{queueNum}/fileName,fileName通常不會是00000000000000000000,如是是上面文件名,想要實現從隊列的最後開始消費,該如何做呢?那就走自動創建消費組的路子,執行如下命令:

./mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g my_consumer_05

//克隆一個訂閱了該topic的消費組消費進度
./mqadmin cloneGroupOffset -n 127.0.0.1:9876 -s my_consumer_01 -d my_consumer_05 -t TopicTest

//重置消費進度到當前隊列的最大值
./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g my_consumer_05 -t TopicTest -s -1

按照上上述命令后,即可實現其目的。

您都看到這裏了,麻煩幫忙點個贊,謝謝您的認可與鼓勵。

作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社區佈道師,公眾號: 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。歡迎加入我的知識星球,構建一個高質量的技術交流社群。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

Spring框架AOP學習總結(下)

目錄

@
在中主要講的是一些Spring的概述、Spring工廠、Spring屬性注入以及IOC入門,其中最重要的是IOC,上一篇中IOC大概講的小結一下:

然後呢這一篇中主要講一下Spring中除了IOC之外的另一個重要的核心:AOP,在Spring中IOC也好,AOP也好,都必須會二者的XML開發以及註解開發,也就是說IOC和AOP的XML開發以及註解開發都要掌握

1、 AOP 的概述

從專業的角度來講(千萬不要問我有多專業,度娘是我表鍋不對是表嫂QAQ):

在軟件業,AOP為Aspect Oriented Programming的縮寫,意為:面向切面編程,通過預編譯方式和運行期動態代理實現程序功能的統一維護的一種技術。AOP是OOP的延續,是軟件開發中的一個熱點,也是Spring框架中的一個重要內容,是函數式編程的一種衍生范型。利用AOP可以對業務邏輯的各個部分進行隔離,從而使得業務邏輯各部分之間的耦合度降低,提高程序的可重用性,同時提高了開發的效率。

從通俗易懂且不失風趣的角度來講:(來自武哥文章)

面向切面編程的目標就是分離關注點。什麼是關注點呢?就是你要做的事,就是關注點。假如你是個公子哥,沒啥人生目標,天天就是衣來伸手,飯來張口,整天只知道玩一件事!那麼,每天你一睜眼,就光想着吃完飯就去玩(你必須要做的事),但是在玩之前,你還需要穿衣服、穿鞋子、疊好被子、做飯等等等等事情,這些事情就是你的關注點,但是你只想吃飯然後玩,那麼怎麼辦呢?這些事情通通交給別人去干。在你走到飯桌之前,有一個專門的僕人A幫你穿衣服,僕人B幫你穿鞋子,僕人C幫你疊好被子,僕人C幫你做飯,然後你就開始吃飯、去玩(這就是你一天的正事),你幹完你的正事之後,回來,然後一系列僕人又開始幫你干這個干那個,然後一天就結束了!
AOP的好處就是你只需要干你的正事,其它事情別人幫你干。也許有一天,你想裸奔,不想穿衣服,那麼你把僕人A解僱就是了!也許有一天,出門之前你還想帶點錢,那麼你再雇一個僕人D專門幫你干取錢的活!這就是AOP。每個人各司其職,靈活組合,達到一種可配置的、可插拔的程序結構。
從Spring的角度看,AOP最大的用途就在於提供了事務管理的能力。事務管理就是一個關注點,你的正事就是去訪問數據庫,而你不想管事務(太煩),所以,Spring在你訪問數據庫之前,自動幫你開啟事務,當你訪問數據庫結束之後,自動幫你提交/回滾事務!

1、1 為什麼學習 AOP

Spring 的 AOP 的由來:AOP 最早由 AOP 聯盟的組織提出的,制定了一套規範.Spring 將 AOP 思想引入到框架中,必須遵守 AOP 聯盟的規範.

Aop解決實際開發中的一些問題:

  • AOP 解決 OOP 中遇到的一些問題.是 OOP 的延續和擴展.

對程序進行增強:不修改源碼的情況下:

  • AOP 可以進行權限校驗,日誌記錄,性能監控,事務控制.

1、2 AOP底層實現: 代理機制(了解)

Spring 的 AOP 的底層用到兩種代理機制:

  • JDK 的動態代理 :針對實現了接口的類產生代理.
  • Cglib 的動態代理 :針對沒有實現接口的類產生代理. 應用的是底層的字節碼增強的技術 生成當前類的子類對象

spring底層會完成自動代理,實現了接口的類默認使用的是JDK 的動態代理,相反的,沒有實現接口的類默認使用的是Cglib 的動態代理 ,底層代碼可以不懂但這個概念一定要知道,不然會被鄙視的,O(∩_∩)O哈哈~,下面是底層代碼,有興趣的可以了解了解。

JDK 動態代理增強一個類中方法:

public class MyJDKProxy implements InvocationHandler {
        private UserDao userDao;

        public MyJDKProxy(UserDao userDao) {
            this.userDao = userDao;
        }

        // 編寫工具方法:生成代理:
        public UserDao createProxy() {
            UserDao userDaoProxy = (UserDao) Proxy.newProxyInstance(userDao
                    .getClass().getClassLoader(), userDao.getClass()
                    .getInterfaces(), this);
            return userDaoProxy;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args)
                throws Throwable {
            if ("save".equals(method.getName())) {
                System.out.println("權限校驗================");
            }
            return method.invoke(userDao, args);
        }
    }

Cglib 動態代理增強一個類中的方法:

public class MyCglibProxy implements MethodInterceptor {
        private CustomerDao customerDao;

        public MyCglibProxy(CustomerDao customerDao) {
            this.customerDao = customerDao;
        }

        // 生成代理的方法:
        public CustomerDao createProxy() {
            // 創建 Cglib 的核心類:
            Enhancer enhancer = new Enhancer();
            // 設置父類:
            enhancer.setSuperclass(CustomerDao.class);
            // 設置回調:
            enhancer.setCallback(this);
            // 生成代理:
            CustomerDao customerDaoProxy = (CustomerDao) enhancer.create();
            return customerDaoProxy;
        }

        @Override
        public Object intercept(Object proxy, Method method, Object[] args,
                MethodProxy methodProxy) throws Throwable {
            if ("delete".equals(method.getName())) {
                Object obj = methodProxy.invokeSuper(proxy, args);
                System.out.println("日誌記錄================");
                return obj;
            }
            return methodProxy.invokeSuper(proxy, args);
        }
    }

2、 Spring 基於AspectJ 進行 AOP 的開發入門(XML 的方式):

首先,Spring為什麼不直接進行Spring的AOP開發呢,而要基於Aspectj呢,是因為,Spring自己的AOP開發實現方式(傳統的AOP開發)繁瑣且複雜,效率極低,於是傳統的AOP開發基本上棄用了,相反Aspectj的AOP開發效率高,所以AOP開發一般是Spring 的基於 AspectJ 的 AOP 開發。

2.1 AOP 的開發中的相關術語:

Aop是一種非常高深的思想,當然會有非常專業的相關術語了(這彎繞的,你打幾分?)

從專業的角度角度概述定義(相對來說比較枯燥不易理解):

Joinpoint(連接點):所謂連接點是指那些被攔截到的點。在 spring 中,這些點指的是方法,因為 spring 只
支持方法類型的連接點.
Pointcut(切入點):所謂切入點是指我們要對哪些 Joinpoint 進行攔截的定義.
Advice(通知/增強):所謂通知是指攔截到 Joinpoint 之後所要做的事情就是通知.通知分為前置通知,後置
通知,異常通知,最終通知,環繞通知(切面要完成的功能)
Introduction(引介):引介是一種特殊的通知在不修改類代碼的前提下, Introduction 可以在運行期為類
動態地添加一些方法或 Field.
Target(目標對象):代理的目標對象
Weaving(織入):是指把增強應用到目標對象來創建新的代理對象的過程.
spring 採用動態代理織入,而 AspectJ 採用編譯期織入和類裝在期織入
Proxy(代理):一個類被 AOP 織入增強后,就產生一個結果代理類
Aspect(切面): 是切入點和通知(引介)的結合

基於專業的角度實例分析(相對來說易理解,什麼?畫質差?咳咳…1080p藍光畫質…哎哎哎..大哥..別打…別打…別打臉):

2.2引入相應的 jar 包

引入jar包:基礎六個jar包、AOP聯盟jar包、spring的AOPjar包、aspectJ的jar包、spring整合aspectj的jar包

  • spring 的傳統 AOP 的開發的包
    spring-aop-4.2.4.RELEASE.jar
    com.springsource.org.aopalliance-1.0.0.jar

  • aspectJ 的開發包:
    com.springsource.org.aspectj.weaver-1.6.8.RELEASE.jar
    spring-aspects-4.2.4.RELEASE.jar

    2.3 引入 Spring 的配置文件

    引入 AOP 約束:

 <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop 
http://www.springframework.org/schema/aop/spring-aop.xsd">
</beans>

2.4 編寫目標類

創建接口和類:

    public interface OrderDao {
        public void save();

        public void update();

        public void delete();

        public void find();
    }

    public class OrderDaoImpl implements OrderDao {
        @Override
        public void save() {
            System.out.println("保存訂單...");
        }

        @Override
        public void update() {
            System.out.println("修改訂單...");
        }

        @Override
        public void delete() {
            System.out.println("刪除訂單...");
        }

        @Override
        public void find() {
            System.out.println("查詢訂單...");
        }
    }

2.5 目標類的XML配置

<!-- 目標類配置:被增強的類 --> 
<bean id="orderDao" class="com.gx.spring.demo3.OrderDaoImpl"></bean>

2.6 整合 Junit 單元測試

前提:引入 spring-test.jar 測試的jar包,整合 Junit 單元測試之後就不需要每次都重複註冊工廠,只要固定格式在測試類上寫兩個註解,需要的屬性直接注入,之後只關心自己的測試類即可

//固定註解寫法(前提:引入 spring-test.jar 測試的jar包)
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration("classpath:applicationContext.xml")
    public class SpringDemo3 {
        @Resource(name = "orderDao")  //需要的屬性直接注入(前提:引入 spring-test.jar 測試的jar包)
        private OrderDao orderDao;

        @Test
        public void demo1() {
            orderDao.save();
            orderDao.update();
            orderDao.delete();
            orderDao.find();
        }
    }

運行demo出現如下效果:

2.7 通知類型

到這裏,就需要需要對通知類型了解一下(前三者常用):

前置通知 :在目標方法執行之前執行.

後置通知 :在目標方法執行之後執行

如果要獲得後置通知中的返回值,必須注意的是:

環繞通知 :在目標方法執行前和執行后執行

異常拋出通知:在目標方法執行出現 異常的時候 執行
最終通知 :無論目標方法是否出現異常 最終通知都會 執行.

通知類型XML配置

2.8 切入點表達式

execution(表達式)

表達式 : [方法訪問修飾符] 方法返回值 包名.類名.方法名(方法的參數)

切入點表達式所以就是execution( [方法訪問修飾符] 方法返回值 包名.類名.方法名(方法的參數))

其中 [ ] 中的方法訪問修飾符可有可無

切入點表達式各類型例子:

public * com.gx.spring.dao. * .*(..)
com.gx.spring.dao.*.*(..)
com.gx.spring.dao.UserDao+.*(..)
com.gx.spring.dao..*.*(..)

2.9 編寫一個切面類

好了,了解了通知類型以及切入點表達式之後就可以來 編寫一個切面類玩起來了QAQ

public class MyAspectXml {
    // 前置增強
    public void before(){
       System.out.println("前置增強===========");
} }

2.10 配置完成增強

<!-- 配置切面類 --> 
<bean id="myAspectXml" class="com.gx.spring.demo3.MyAspectXml"></bean>
<!-- 進行 aop 的配置 --> 
<aop:config>
<!-- 配置切入點表達式:哪些類的哪些方法需要進行增強 -->
 <aop:pointcut expression="execution(* com.gx.spring.demo3.OrderDao.save(..))" id="pointcut1"/>
<!-- 配置切面 --> 
<aop:aspect ref="myAspectXml"> 
    <aop:before method="before" pointcut-ref="pointcut1"/>
</aop:aspect>
</aop:config>

需要注意的點我都規劃出來了(不用誇我,我知道我長得帥QnQ)

2.11 其他的增強的配置:

<!-- 配置切面類 -->
 <bean id="myAspectXml" class="com.gx.demo3.MyAspectXml"></bean>
    <!-- 進行 aop 的配置 -->
 <aop:config>
    <!-- 配置切入點表達式:哪些類的哪些方法需要進行增強 -->
     <aop:pointcut expression="execution(* com.gx.spring.demo3.*Dao.save(..))" id="pointcut1"/>
     <aop:pointcut expression="execution(* com.gx.spring.demo3.*Dao.delete(..))" id="pointcut2"/>
     <aop:pointcut expression="execution(* com.gx.spring.demo3.*Dao.update(..))" id="pointcut3"/>
     <aop:pointcut expression="execution(* com.gx.spring.demo3.*Dao.find(..))" id="pointcut4"/>
    <!-- 配置切面 --> 
    <aop:aspect ref="myAspectXml">
       <aop:before method="before" pointcut-ref="pointcut1"/>
       <aop:after-returning method="afterReturing"pointcut-ref="pointcut2"/>
       <aop:around method="around" pointcut-ref="pointcut3"/>
       <aop:after-throwing method="afterThrowing" pointcut-ref="pointcut4"/>
       <aop:after method="after" pointcut-ref="pointcut4"/>
    </aop:aspect>
</aop:config>

3、Spring 基於AspectJ 進行 AOP 的開發入門(註解的方式):

3.1創建項目,引入jar包

引入的jar包如下:

3.2引入配置文件

3.3編寫目標類並配置

編寫目標類:

package com.gx.spring.demo1;

public class OrderDao {

    public void save(){
        System.out.println("保存訂單...");
    }
    public void update(){
        System.out.println("修改訂單...");
    }
    public String delete(){
        System.out.println("刪除訂單...");
        return "鄢寒";
    }
    public void find(){
        System.out.println("查詢訂單...");
    }
}

XML配置:

<!-- 配置目標類 -->
    <bean id="orderDao" class="com.gx.spring.demo1.OrderDao">

    </bean>

3.4編寫切面類並配置

編寫切面類

package com.gx.spring.demo1;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;

/**
 * 切面類:註解的切面類
 * @author jt
 */
public class MyAspectAnno {

    public void before(){
        System.out.println("前置增強===========");
    }
}

XML配置:

<!-- 配置切面類 -->
    <bean id="myAspect" class="com.gx.spring.demo1.MyAspectAnno">
    
    </bean>

3.5使用註解的AOP對象目標類進行增強

1、在配置文件中打開註解的AOP開發

<!-- 在配置文件中開啟註解的AOP的開發 -->
    <aop:aspectj-autoproxy/>

2、在切面類上使用註解
在類上使用@Aspect註解代表這是一個切面類
在方法上注入屬性@Before(execution表達式)代表前置增強

@Aspect
public class MyAspectAnno {

    @Before(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public void before(){
        System.out.println("前置增強===========");
    }
}

3.6編寫測試類

package com.gx.spring.demo1;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * Spring的AOP的註解開發
 *
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
public class SpringDemo1 {
    @Resource(name="orderDao")
    private static OrderDao orderDao;
    
    public static void main(String[] args) {
        
            orderDao.save();
            orderDao.update();
            orderDao.delete();
            orderDao.find();
        
    }
    
}

測試結果:

4、Spring的註解的AOP的通知類型

4.1@Before :前置通知

@Aspect
public class MyAspectAnno {

    @Before(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public void before(){
        System.out.println("前置增強===========");
    }
}

4.2@AfterReturning :後置通知

後置通知可以獲取方法返回值

// 後置通知:
    @AfterReturning(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public void afterReturning(Object result){
        System.out.println("後置增強==========="+result);
    }

借用一下XML方式的圖,意思意思啦,意思還是那個意思QnQ

4.3@Around :環繞通知

// 環繞通知:
    @Around(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable{
        System.out.println("環繞前增強==========");
        Object obj  = joinPoint.proceed();
        System.out.println("環繞后增強==========");
        return obj;
    }

4.4@AfterThrowing :異常拋出通知

測試前記得製造出個異常qnq

// 異常拋出通知:
    @AfterThrowing(value="execution(* com.gx.spring.demo1.OrderDao.save(..))" throwing="e")
    public void afterThrowing(Throwable e){
        System.out.println("異常拋出增強========="+e.getMessage());
    }

4.5@After :最終通知

// 最終通知
    @After(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public void after(){
        System.out.println("最終增強============");
    }

5、Spring的註解的AOP的切入點的配置

首先,我們發現在Spring 基於AspectJ 進行 AOP 的開發入門(註解的方式)的過程中如果方法過多,通知過多並且作用於一個方法,需求一改變就需要更改相應的源代碼,為了更好的維護,於是有了AOP的切入點的配置,AOP的切入點的配置能很好地決絕改問題!只需要管理AOP的切入點的配置即可!

具體代碼如下:

package com.gx.spring.demo1;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;

/**
 * 切面類:註解的切面類
 * @author jt
 */
@Aspect
public class MyAspectAnno {
    // 前置通知:
    @Before(value="MyAspectAnno.pointcut2()")
    public void before(){
        System.out.println("前置增強===========");
    }
    
    // 後置通知:
    @AfterReturning(value="MyAspectAnno.pointcut4()",returning="result")
    public void afterReturning(Object result){
        System.out.println("後置增強==========="+result);
    }
    
    // 環繞通知:
    @Around(value="MyAspectAnno.pointcut3()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable{
        System.out.println("環繞前增強==========");
        Object obj  = joinPoint.proceed();
        System.out.println("環繞后增強==========");
        return obj;
    }
    
    // 異常拋出通知:
    @AfterThrowing(value="MyAspectAnno.pointcut1()",throwing="e")
    public void afterThrowing(Throwable e){
        System.out.println("異常拋出增強========="+e.getMessage());
    }
    
    // 最終通知
    @After(value="MyAspectAnno.pointcut1()")
    public void after(){
        System.out.println("最終增強============");
    }
    
    // 切入點註解:
    @Pointcut(value="execution(* com.gx.spring.demo1.OrderDao.find(..))")
    private void pointcut1(){}
    @Pointcut(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    private void pointcut2(){}
    @Pointcut(value="execution(* com.gx.spring.demo1.OrderDao.update(..))")
    private void pointcut3(){}
    @Pointcut(value="execution(* com.gx.spring.demo1.OrderDao.delete(..))")
    private void pointcut4(){}
}

如果本文對你有一點點幫助,那麼請點個讚唄,謝謝~

最後,若有不足或者不正之處,歡迎指正批評,感激不盡!如果有疑問歡迎留言,絕對第一時間回復!

歡迎各位關注我的公眾號,一起探討技術,嚮往技術,追求技術,說好了來了就是盆友喔…

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品在網路上成為最夯、最多人討論的話題?

※高價收購3C產品,價格不怕你比較

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

一文帶你深入了解 redis 複製技術及主從架構

主從架構可以說是互聯網必備的架構了,第一是為了保證服務的高可用,第二是為了實現讀寫分離,你可能熟悉我們常用的 MySQL 數據庫的主從架構,對於我們 redis 來說也不意外,redis 數據庫也有各種各樣的主從架構方式,在主從架構中會涉及到主節點與從節點之間的數據同步,這個數據同步的過程在 redis 中叫做複製,這在篇文章中,我們詳細的聊一聊 redis 的複製技術和主從架構 ,本文主要有以下內容:

  • 主從架構環境搭建
    • 主從架構的建立方式
    • 主從架構的斷開
  • 複製技術的原理
    • 數據同步過程
    • 心跳檢測
  • 主從拓撲架構
    • 一主一從
    • 一主多從
    • 樹狀結構

主從環境搭建

redis 的實例在默認的情況下都是主節點,所以我們需要修改一些配置來搭建主從架構,redis 的主從架構搭建還是比較簡單的,redis 提供了三種方式來搭建主從架構,在後面我們將就介紹,在介紹之前我們要先了解主從架構的特性:在主從架構中有一個主節點(master)和最少一個從節點(slave),並且數據複製是單向的,只能從主節點複製到從節點,不能由從節點到主節點。

主從架構的建立方式

主從架構的建立有以下三種方式:

  • 在 Redis.conf 配置文件中加入 slaveof {masterHost} {masterPort} 命令,隨 Redis 實例的啟動生效
  • 在 redis-server 啟動命令后加入 –slaveof {masterHost} {masterPort} 參數
  • 在 redis-cli 交互窗口下直接使用命令:slaveof {masterHost} {masterPort}

上面三種方式都可以搭建 Redis 主從架構,我們以第一種方式來演示,其他兩種方式自行嘗試,由於是演示,所以就在本地啟動兩個 Redis 實例,並不在多台機器上啟動 redis 的實例了,我們準備一個端口 6379 的主節點實例,準備一個端口 6480 從節點的實例,端口 6480 的 redis 實例配置文件取名為 6480.conf 並且在裏面添加 slaveof 語句,在配置文件最後加入如下一條語句

slaveof 127.0.0.1 6379

分別啟動兩個 redis 實例,啟動之後他們會自動建立主從關係,關於這背後的原理,我們後面在詳細的聊一聊,先來驗證一下我們的主從架構是否搭建成功,我們先在 6379 master 節點上新增一條數據:

然後再 6480 slave 節點上獲取該數據:

可以看出我們在 slave 節點上已經成功的獲取到了在 master 節點新增的值,說明主從架構已經搭建成功了,我們使用 info replication 命令來查看兩個節點的信息,先來看看主節點的信息

可以看出 6379 端口的實例 role 為 master,有一個正在連接的實例,還有其他運行的信息,我們再來看看 6480 端口的 redis 實例信息

可以看出兩個節點之間相互記錄著對象的信息,這些信息在數據複製時候將會用到。在這裡有一點需要說明一下,默認情況下 slave 節點是只讀的,並不支持寫入,也不建議開啟寫入,我們可以驗證一下,在 6480 實例上寫入一條數據

127.0.0.1:6480> set x 3
(error) READONLY You can't write against a read only replica.
127.0.0.1:6480> 

提示只讀,並不支持寫入操作,當然我們也可以修改該配置,在配置文件中 replica-read-only yes 配置項就是用來控制從服務器只讀的,為什麼只能只讀?因為我們知道複製是單向的,數據只能由 master 到 slave 節點,如果在 salve 節點上開啟寫入的話,那麼修改了 slave 節點的數據, master 節點是感知不到的,slave 節點的數據並不能複製到 master 節點上,這樣就會造成數據不一致的情況,所以建議 slave 節點只讀

主從架構的斷開

主從架構的斷開同樣是 slaveof 命令,在從節點上執行 slaveof no one 命令就可以與主節點斷開追隨關係,我們在 6480 節點上執行 slaveof no one 命令

127.0.0.1:6480> slaveof no one
OK
127.0.0.1:6480> info replication
# Replication
role:master
connected_slaves:0
master_replid:a54f3ba841c67762d6c1e33456c97b94c62f6ac0
master_replid2:e5c1ab2a68064690aebef4bd2bd4f3ddfba9cc27
master_repl_offset:4367
second_repl_offset:4368
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:4367
127.0.0.1:6480> 

執行完 slaveof no one 命令之後,6480 節點的角色立馬恢復成了 master ,我們再來看看時候還和 6379 實例連接在一起,我們在 6379 節點上新增一個 key-value

127.0.0.1:6379> set y 3
OK

在 6480 節點上 get y

127.0.0.1:6480> get y
(nil)
127.0.0.1:6480> 

在 6480 節點上獲取不到 y ,因為 6480 節點已經跟 6379 節點斷開的聯繫,不存在主從關係了,slaveof 命令不僅能夠斷開連接,還能切換主服務器,使用命令為 slaveof {newMasterIp} {newMasterPort},我們讓 6379 成為 6480 的從節點, 在 6379 節點上執行 slaveof 127.0.0.1 6480 命令,我們在來看看 6379 的 info replication

127.0.0.1:6379> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6480
master_link_status:up
master_last_io_seconds_ago:2
master_sync_in_progress:0
slave_repl_offset:4367
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:99624d4b402b5091552b9cb3dd9a793a3005e2ea
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:4367
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:4368
repl_backlog_histlen:0
127.0.0.1:6379> 

6379 節點的角色已經是 slave 了,並且主節點的是 6480 ,我們可以再看看 6480 節點的 info replication

127.0.0.1:6480> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6379,state=online,offset=4479,lag=1
master_replid:99624d4b402b5091552b9cb3dd9a793a3005e2ea
master_replid2:a54f3ba841c67762d6c1e33456c97b94c62f6ac0
master_repl_offset:4479
second_repl_offset:4368
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:4479
127.0.0.1:6480> 

在 6480 節點上有 6379 從節點的信息,可以看出 slaveof 命令已經幫我們完成了主服務器的切換。

複製技術的原理

redis 的主從架構好像很簡單一樣,我們就執行了一條命令就成功搭建了主從架構,並且數據複製也沒有問題,使用起來確實簡單,但是這背後 redis 還是幫我們做了很多的事情,比如主從服務器之間的數據同步、主從服務器的狀態檢測等,這背後 redis 是如何實現的呢?接下來我們就一起看看

數據複製原理

我們執行完 slaveof 命令之後,我們的主從關係就建立好了,在這個過程中, master 服務器與 slave 服務器之間需要經歷多個步驟,如下圖所示:

slaveof 命令背後,主從服務器大致經歷了七步,其中權限驗證這一步不是必須的,為了能夠更好的理解這些步驟,就以我們上面搭建的 redis 實例為例來詳細聊一聊各步驟。

1、保存主節點信息

在 6480 的客戶端向 6480 節點服務器發送 slaveof 127.0.0.1 6379 命令時,我們會立馬得到一個 OK

127.0.0.1:6480> slaveof 127.0.0.1 6379
OK
127.0.0.1:6480> 

這時候數據複製工作並沒有開始,數據複製工作是在返回 OK 之後才開始執行的,這時候 6480 從節點做的事情是將給定的主服務器 IP 地址 127.0.0.1 以及端口 6379 保存到服務器狀態的 masterhost 屬性和 masterport 屬性裏面

2、建立 socket 連接

在 slaveof 命令執行完之後,從服務器會根據命令設置的 IP 地址和端口,跟主服務器創建套接字連接, 如果從服務器能夠跟主服務器成功的建立 socket 連接,那麼從服務器將會為這個 socket 關聯一個專門用於處理複製工作的文件事件處理器,這個處理器將負責後續的複製工作,比如接受全量複製的 RDB 文件以及服務器傳來的寫命令。同樣主服務器在接受從服務器的 socket 連接之後,將為該 socket 創建一個客戶端狀態,這時候的從服務器同時具有服務器和客戶端兩個身份,從服務器可以向主服務器發送命令請求而主服務器則會向從服務器返回命令回復。

3、發送 ping 命令

從服務器與主服務器連接成功后,做的第一件事情就是向主服務器發送一個 ping 命令,發送 ping 命令主要有以下目的:

  • 檢測主從之間網絡套接字是否可用
  • 檢測主節點當前是否可接受處理命令

在發送 ping 命令之後,正常情況下主服務器會返回 pong 命令,接受到主服務器返回的 pong 回復之後就會進行下一步工作,如果沒有收到主節點的 pong 回復或者超時,比如網絡超時或者主節點正在阻塞無法響應命令,從服務器會斷開複製連接,等待下一次定時任務的調度。

4、身份驗證

從服務器在接收到主服務器返回的 pong 回復之後,下一步要做的事情就是根據配置信息決定是否需要身份驗證:

  • 如果從服務器設置了 masterauth 參數,則進行身份驗證
  • 如果從服務器沒有設置 masterauth 參數,則不進行身份驗證

在需要身份驗證的情況下,從服務器將就向主服務器發送一條 auth 命令,命令參數為從服務器 masterauth 選項的值,舉個例子,如果從服務器的配置里將 masterauth 參數設置為:123456,那麼從服務器將向主服務器發送 auth 123456 命令,身份驗證的過程也不是一帆風順的,可能會遇到以下幾種情況:

  • 從服務器通過 auth 命令發送的密碼與主服務器的 requirepass 參數值一致,那麼將繼續進行後續操作,如果密碼不一致,主服務將返回一個 invalid password 錯誤
  • 如果主服務器沒有設置 requirepass 參數,那麼主服務器將返回一個 no password is set 錯誤

所有的錯誤情況都會令從服務器中止當前的複製工作,並且要從建立 socket 開始重新發起複制流程,直到身份驗證通過或者從服務器放棄執行複製為止

5、發送端口信息

在身份驗證通過後,從服務器將執行 REPLCONF listening 命令,向主服務器發送從服務器的監聽端口號,例如在我們的例子中從服務器監聽的端口為 6480,那麼從服務器將向主服務器發送 REPLCONF listening 6480 命令,主服務器接收到這個命令之後,會將端口號記錄在從服務器所對應的客戶端狀態的 slave_listening_port 屬性了,也就是我們在 master 服務器的 info replication 裏面看到的 port 值。

6、數據複製

數據複製是最複雜的一塊了,由 psync 命令來完成,從服務器會向主服務器發送一個 psync 命令來進行數據同步,在 redis 2.8 版本以前使用的是 sync 命令,除了命令不同之外,在複製的方式上也有很大的不同,在 redis 2.8 版本以前使用的都是全量複製,這對主節點和網絡會造成很大的開銷,在 redis 2.8 版本以後,數據同步將分為全量同步和部分同步。

  • 全量複製:一般用於初次複製場景,不管是新舊版本的 redis 在從服務器第一次與主服務連接時都將進行一次全量複製,它會把主節點的全部數據一次性發給從節點,當數據較大時,會對主節點和網絡造成很大的開銷,redis 的早期版本只支持全量複製,這不是一種高效的數據複製方式

  • 部分複製:用於處理在主從複製中因網絡閃斷等原因造成的數據丟失 場景,當從節點再次連上主節點后,如果條件允許,主節點會補發丟失數據 給從節點。因為補發的數據遠遠小於全量數據,可以有效避免全量複製的過高開銷,部分複製是對老版複製的重大優化,有效避免了不必要的全量複製操作

redis 之所以能夠支持全量複製和部分複製,主要是對 sync 命令的優化,在 redis 2.8 版本以後使用的是一個全新的 psync 命令,命令格式為:psync {runId} {offset},這兩個參數的意義:

  • runId:主節點運行的id
  • offset:當前從節點複製的數據偏移量

也許你對上面的 runid、offset 比較陌生,沒關係,我們先來看看下面三個概念:

1、複製偏移量

參与複製的主從節點都會分別維護自身複製偏移量:主服務器每次向從服務器傳播 N 個字節的數據時,就將自己的偏移量的值加上 N,從服務器每次接收到主服務器傳播的 N個字節的數據時,將自己的偏移量值加上 N。通過對比主從服務器的複製偏移量,就可以知道主從服務器的數據是否一致,如果主從服務器的偏移量總是相同,那麼主從數據一致,相反,如果主從服務器兩個的偏移量並不相同,那麼說明主從服務器並未處於數據一致的狀態,比如在有多個從服務器時,在傳輸的過程中某一個服務器離線了,如下圖所示:

由於從服務器A 在數據傳輸時,由於網絡原因掉線了,導致偏移量與主服務器不一致,那麼當從服務器A 重啟並且與主服務器連接成功后,重新向主服務器發送 psync 命令,這時候數據複製應該執行全量複製還是部分複製呢?如果執行部分複製,主服務器又如何補償從服務器A 在斷線期間丟失的那部分數據呢?這些問題的答案都在複製積壓緩衝區裏面

2、複製積壓緩衝區

複製積壓緩衝區是保存在主節點上的一個固定長度的隊列,默認大小為 1MB,當主節點有連接的從節點(slave)時被創建,這時主節點(master) 響應寫命令時,不但會把命令發送給從節點,還會寫入複製積壓緩衝區,如下圖所示:

因此,主服務器的複製積壓緩衝區裏面會保存着一部分最近傳播的寫命令,並且複製積壓緩衝區會為隊列中的每個字節記錄相應的複製偏移量。所以當從服務器重新連上主服務器時,從服務器通過 psync 命令將自己的複製偏移量 offset 發送給主服務器,主服務器會根據這個複製偏移量來決定對從服務器執行何種數據同步操作:

  • 如果從服務器的複製偏移量之後的數據仍然存在於複製積壓緩衝區裏面,那麼主服務器將對從服務器執行部分複製操作
  • 如果從服務器的複製偏移量之後的數據不存在於複製積壓緩衝區裏面,那麼主服務器將對從服務器執行全量複製操作

3、服務器運行ID

每個 Redis 節點啟動后都會動態分配一個 40 位的十六進制字符串作為運行 ID,運行 ID 的主要作用是用來唯一識別 Redis 節點,我們可以使用 info server 命令來查看

127.0.0.1:6379> info server
# Server
redis_version:5.0.5
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:2ef1d58592147923
redis_mode:standalone
os:Linux 3.10.0-957.27.2.el7.x86_64 x86_64
arch_bits:64
multiplexing_api:epoll
atomicvar_api:atomic-builtin
gcc_version:4.8.5
process_id:25214
run_id:7b987673dfb4dfc10dd8d65b9a198e239d20d2b1
tcp_port:6379
uptime_in_seconds:14382
uptime_in_days:0
hz:10
configured_hz:10
lru_clock:14554933
executable:/usr/local/redis-5.0.5/src/./redis-server
config_file:/usr/local/redis-5.0.5/redis.conf
127.0.0.1:6379> 

這裏面有一個run_id 字段就是服務器運行的ID

了解這幾個概念之後,我們一起來看看 psync 命令的運行流程,psync 命令運行流程如下圖所示:

psync 命令的邏輯比較簡單,整個流程分為兩步:

1、從節點發送 psync 命令給主節點,參數 runId 是當前從節點保存的主節點運行ID,參數offset是當前從節點保存的複製偏移量,如果是第一次參与複製則默認值為 -1。

2、主節點接收到 psync 命令之後,會向從服務器返回以下三種回復中的一種:

  • 回復 +FULLRESYNC {runId} {offset}:表示主服務器將與從服務器執行一次全量複製操作,其中 runid 是這個主服務器的運行 id,從服務器會保存這個id,在下一次發送 psync 命令時使用,而 offset 則是主服務器當前的複製偏移量,從服務器會將這個值作為自己的初始化偏移量
  • 回復 +CONTINUE:那麼表示主服務器與從服務器將執行部分複製操作,從服務器只要等着主服務器將自己缺少的那部分數據發送過來就可以了
  • 回復 +ERR:那麼表示主服務器的版本低於 redis 2.8,它識別不了 psync 命令,從服務器將向主服務器發送 sync 命令,並與主服務器執行全量複製

7、命令持續複製

當主節點把當前的數據同步給從節點后,便完成了複製的建立流程。但是主從服務器並不會斷開連接,因為接下來主節點會持續地把寫命令發送給從節點,保證主從數據一致性。

經過上面 7 步就完成了主從服務器之間的數據同步,由於這篇文章的篇幅比較長,關於全量複製和部分複製的細節就不介紹了,全量複製就是將主節點的當前的數據生產 RDB 文件,發送給從服務器,從服務器再從本地磁盤加載,這樣當文件過大時就需要特別大的網絡開銷,不然由於數據傳輸比較慢會導致主從數據延時較大,部分複製就是主服務器將複製積壓緩衝區的寫命令直接發送給從服務器。

心跳檢測

心跳檢測是發生在主從節點在建立複製后,它們之間維護着長連接並彼此發送心跳命令,便以後續持續發送寫命令,主從心跳檢測如下圖所示:

主從節點彼此都有心跳檢測機制,各自模擬成對方的客戶端進行通信,主從心跳檢測的規則如下:

  • 主節點默認每隔 10 秒對從節點發送 ping 命令,判斷從節點的存活性和連接狀態。可通過修改 redis.conf 配置文件裏面的 repl-ping-replica-period 參數來控制發送頻率
  • 從節點在主線程中每隔 1 秒發送 replconf ack {offset} 命令,給主節點 上報自身當前的複製偏移量,這條命令除了檢測主從節點網絡之外,還通過發送複製偏移量來保證主從的數據一致

主節點根據 replconf 命令判斷從節點超時時間,體現在 info replication 統 計中的 lag 信息中,我們在主服務器上執行 info replication 命令:

127.0.0.1:6379> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6480,state=online,offset=25774,lag=0
master_replid:c62b6621e3acac55d122556a94f92d8679d93ea0
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:25774
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:25774
127.0.0.1:6379> 

可以看出 slave0 字段的值最後面有一個 lag,lag 表示與從節點最後一次通信延遲的秒數,正常延遲應該在 0 和 1 之間。如果超過 repl-timeout 配置的值(默認60秒),則判定從節點下線並斷開複製客戶端連接,如果從節點重新恢復,心跳檢測會繼續進行。

主從拓撲架構

Redis的主從拓撲結構可以支持單層或多層複製關係,根據拓撲複雜性可以分為以下三種:一主一從、一主多從、樹狀主從架構

一主一從結構

一主一從結構是最簡單的複製拓撲結構,我們前面搭建的就是一主一從的架構,架構如圖所示:

一主一從架構用於主節點出現宕機時從節點 提供故障轉移支持,當應用寫命令併發量較高且需要持久化時,可以只在從節點上開啟 AOF,這樣既保證數據安全性同時也避免了持久化對主節點的性能干擾。但是這裡有一個坑,需要你注意,就是當主節點關閉持久化功能時, 如果主節點脫機要避免自動重啟操作。因為主節點之前沒有開啟持久化功能自動重啟后數據集為空,這時從節點如果繼續複製主節點會導致從節點數據也被清空的情況,喪失了持久化的意義。安全的做法是在從節點上執行 slaveof no one 斷開與主節點的複製關係,再重啟主節點從而避免這一問題

一主多從架構

一主多從架構又稱為星形拓撲結構,一主多從架構如下圖所示:

一主多從架構可以實現讀寫分離來減輕主服務器的壓力,對於讀佔比較大的場景,可以把讀命令發送到 從節點來分擔主節點壓力。同時在日常開發中如果需要執行一些比較耗時的讀命令,如:keys、sort等,可以在其中一台從節點上執行,防止慢查詢對主節點造成阻塞從而影響線上服務的穩定性。對於寫併發量較高的場景,多個從節點會導致主節點寫命令的多次發送從而過度消耗網絡帶寬,同時也加重了主節點的負載影響服務穩定性。

樹狀主從架構

樹狀主從架構又稱為樹狀拓撲架構,樹狀主從架構如下圖所示:

樹狀主從架構使得從節點不但可以複製主節 數據,同時可以作為其他從節點的主節點繼續向下層複製。解決了一主多從架構中的不足,通過引入複製中 間層,可以有效降低主節點負載和需要傳送給從節點的數據量。如架構圖中,數據寫入節點A 後會同步到 B 和 C節點,B節點再把數據同步到 D 和 E節點,數據實現了一層一層的向下複製。當主節點需要掛載多個從節點時為了避免對主節點的性能干擾,可以採用樹狀主從結構降低主節點壓力。

最後

目前互聯網上很多大佬都有 Redis 系列教程,如有雷同,請多多包涵了。原創不易,碼字不易,還希望大家多多支持。若文中有所錯誤之處,還望提出,謝謝。

歡迎掃碼關注微信公眾號:「平頭哥的技術博文」,和平頭哥一起學習,一起進步。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品在網路上成為最夯、最多人討論的話題?

※高價收購3C產品,價格不怕你比較

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!