springboot + rabbitmq 做智能家居,我也沒想到會這麼簡單

本文收錄在個人博客:www.chengxy-nds.top,共享技術資源,共同進步

前一段有幸參与到一個智能家居項目的開發,由於之前都沒有過這方面的開發經驗,所以對智能硬件的開發模式和技術棧都頗為好奇。

產品是一款可燃氣體報警器,如果家中燃氣泄露濃度到達一定閾值,報警器檢測到並上傳氣體濃度值給後台,後台以電話、短信、微信等方式,提醒用戶家中可能有氣體泄漏。

用戶還可能向報警器發一些關閉報警、調整音量的指令等。整體功能還是比較簡單的,大致的邏輯如下圖所示:

但當我真正的參与其中開發時,其實有一點小小的失望,因為在整個研發過程中,並沒用到什麼新的技術,還是常規的幾種中間件,只不過換個用法而已。

技術選型用rabbitmq 來做核心的組件,主要考慮到運維成本低,組內成員使用的熟練度比較高。

下面和小夥伴分享一下如何用 springboot + rabbitmq 搭建物聯網(IOT)平台,其實智能硬件也沒想象的那麼高不可攀!

很多小夥伴可能有點懵?rabbitmq 不是消息隊列嗎?怎麼又能做智能硬件了

其實rabbitmq有兩種協議,我們平時接觸的消息隊列是用的AMQP協議,而用在智能硬件中的是MQTT協議。

一、什麼是 MQTT協議?

MQTT 全稱(Message Queue Telemetry Transport):一種基於發布/訂閱(publish/subscribe)模式的輕量級通訊協議,通過訂閱相應的主題來獲取消息,是物聯網(Internet of Thing)中的一個標準傳輸協議。

該協議將消息的發布者(publisher)與訂閱者(subscriber)進行分離,因此可以在不可靠的網絡環境中,為遠程連接的設備提供可靠的消息服務,使用方式與傳統的MQ有點類似。

TCP協議位於傳輸層,MQTT 協議位於應用層,MQTT 協議構建於TCP/IP協議上,也就是說只要支持TCP/IP協議棧的地方,都可以使用MQTT協議。

二、為什麼要用 MQTT協議?

MQTT協議為什麼在物聯網(IOT)中如此受偏愛?而不是其它協議,比如我們更為熟悉的 HTTP協議呢?

  • 首先HTTP協議它是一種同步協議,客戶端請求后需要等待服務器的響應。而在物聯網(IOT)環境中,設備會很受制於環境的影響,比如帶寬低、網絡延遲高、網絡通信不穩定等,顯然異步消息協議更為適合IOT應用程序。

  • HTTP是單向的,如果要獲取消息客戶端必須發起連接,而在物聯網(IOT)應用程序中,設備或傳感器往往都是客戶端,這意味着它們無法被動地接收來自網絡的命令。

  • 通常需要將一條命令或者消息,發送到網絡上的所有設備上。HTTP要實現這樣的功能不但很困難,而且成本極高。

三、MQTT協議介紹

前邊說過MQTT是一種輕量級的協議,它只專註於發消息, 所以此協議的結構也非常簡單。

MQTT數據包

MQTT協議中,一個MQTT數據包由:固定頭(Fixed header)、 可變頭(Variable header)、 消息體(payload)三部分構成。

  • 固定頭(Fixed header),所有數據包中都有固定頭,包含數據包類型及數據包的分組標識。
  • 可變頭(Variable header),部分數據包類型中有可變頭。
  • 內容消息體(Payload),存在於部分數據包類,是客戶端收到的具體消息內容。

1、固定頭

固定頭部,使用兩個字節,共16位:

(4-7)位表示消息類型,使用4位二進製表示,可代表如下的16種消息類型,不過 0 和 15位置屬於保留待用,所以共14種消息事件類型。

DUP Flag(重試標識)

DUP Flag:保證消息可靠傳輸,消息是否已送達的標識。默認為0,只佔用一個字節,表示第一次發送,當值為1時,表示當前消息先前已經被傳送過。

QoS Level(消息質量等級)

QoS Level:消息的質量等級,後邊會詳細介紹

RETAIN(持久化)

  • 值為1:表示發送的消息需要一直持久保存,而且不受服務器重啟影響,不但要發送給當前的訂閱者,且以後新加入的客戶端訂閱了此Topic,訂閱者也會馬上得到推送。
    注意:新加入的訂閱者,只會取出最新的一個RETAIN flag = 1的消息推送。

  • 值為0:僅為當前訂閱者推送此消息。

Remaining Length(剩餘長度)

在當前消息中剩餘的byte(字節)數,包含可變頭部和消息體payload。

2、可變頭

固定頭部僅定義了消息類型和一些標誌位,一些消息的元數據需要放入可變頭部中。可變頭部內容字節長度 + 消息體payload = 剩餘長度。

可變頭部居於固定頭部和payload中間,包含了協議名稱,版本號,連接標誌,用戶授權,心跳時間等內容。

可變頭存在於這些類型的消息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。

3、消息體payload

消息體payload只存在於CONNECTPUBLISHSUBSCRIBESUBACKUNSUBSCRIBE這幾種類型的消息:

  • CONNECT:包含客戶端的ClientId、訂閱的TopicMessage以及用戶名密碼
  • PUBLISH:向對應主題發送消息。
  • SUBSCRIBE:要訂閱的主題以及QoS
  • SUBACK:服務器對於SUBSCRIBE所申請的主題及QoS進行確認和回復。
  • UNSUBSCRIBE:取消要訂閱的主題。

消息質量(QoS )

消息質量(Quality of Service),即消息的發送質量,發布者(publisher)和訂閱者(subscriber)都可以指定qos等級,有QoS 0QoS 1QoS 2三個等級。

下邊分別說明一下這三個等級的區別。

1、Qos 0At most once(至多一次),只發送一次消息,不保證消息是否成功送達,沒有確認機制,消息可能會丟失或重複。

2、Qos 1At least once(至少一次),相對於QoS 0而言Qos 1增加了ack確認機制,發送者(publisher)推送消息到MQTT代理(broker)時,兩者自身都會先持久化消息,只有當publisher 或者 Broker分別收到 PUBACK確認時,才會刪除自身持久化的消息,否則就會重發。

但有個問題,儘管我們可以通過確認來保證一定收到客戶端 或 服務器的message,可我們卻不能保證僅收到一次message,也就是當客戶端publisher沒收到Brokerpuback或者 Broker沒有收到subscriberpuback,那麼就會一直重發。

publisher -> broker 大致流程:

  1. publisher store msg -> publish ->broker (傳遞message)
  2. broker -> puback -> publisher delete msg (確認傳遞成功)

3、Qos 2Exactly once(只有一次),相對於QoS 1QoS 2升級實現了僅接受一次messagepublisherbroker 同樣對消息進行持久化,其中 publisher 緩存了message和 對應的msgID,而 broker 緩存了 msgID,可以保證消息不重複,由於又增加了一個confirm 機制,整個流程變得複雜很多。

publisher -> broker 大致流程:

  1. publisher store msg -> publish ->broker -> broker store
  2. msgID(傳遞message) broker -> puberc (確認傳遞成功)
  3. publisher -> pubrel ->broker delete msgID (告訴broker刪除msgID)
  4. broker -> pubcomp -> publisher delete msg (告訴publisher刪除msg)

LWT(最後遺囑)

LWT 全稱為 Last Will and Testament,其實遺囑是一個由客戶端預先定義好的主題和對應消息,附加在CONNECT的數據包中,包括遺願主題遺願 QoS遺願消息等。

當MQTT代理 Broker 檢測到有客戶端client非正常斷開連接時,再由服務器主動發布此消息,然後相關的訂閱者會收到消息。

舉個栗子:聊天室中所有人都訂閱一個叫talk的主題 ,但小富由於網絡抖動突然斷開了鏈接,這時聊天室中所有訂閱主題 talk的客戶端都會收到一個 “小富離開聊天室” 的遺願消息。

遺囑的相關參數:

  • Will Flag:是否使用 LWT,1 開啟
  • Will Topic:遺願主題名,不可使用通配符
  • Will Qos:發布遺願消息時使用的 QoS
  • Will Retain:遺願消息的 Retain 標識
  • Will Message:遺願消息內容

那客戶端Client 有哪些場景是非正常斷開連接呢?

  • Broker 檢測到底層的 I/O 異常;
  • 客戶端 未能在心跳 Keep Alive 的間隔內和 Broker 進行消息交互;
  • 客戶端 在關閉底層 TCP 連接前沒有發送 DISCONNECT 數據包;
  • 客戶端 發送錯誤格式的數據包到 Broker,導致關閉和客戶端的連接等。

注意:當客戶端通過發布 DISCONNECT 數據包斷開連接時,屬於正常斷開連接,並不會觸發 LWT 的機制,與此同時Broker 還會丟棄掉當前客戶端在連接時指定的相關 LWT 參數。

四、MQTT協議應用場景

MQTT協議廣泛應用於物聯網、移動互聯網、智能硬件、車聯網、電力能源等領域。使用的場景也是非常非常多,下邊列舉一些:

  • 物聯網M2M通信,物聯網大數據採集
  • Android消息推送,WEB消息推送
  • 移動即時消息,例如Facebook Messenger
  • 智能硬件、智能傢具、智能電器
  • 車聯網通信,電動車站樁採集
  • 智慧城市、遠程醫療、遠程教育
  • 電力、石油與能源等行業市場

五、代碼實現

具體 rabbitmq 的環境搭建就不贅述了,網上教程比較多,有條件的用服務器,沒條件的像我搞個Windows版的也很快樂嘛。

1、啟用 rabbitmq的mqtt協議

我們先開啟 rabbitmqmqtt協議,因為默認安裝下是關閉的,命令如下:

rabbitmq-plugins enable rabbitmq_mqtt

2、mqtt 客戶端依賴包

上一步中安裝rabbitmq環境並開啟 mqtt協議后,實際上mqtt 消息代理服務就搭建好了,接下來要做的就是實現客戶端消息的推送和訂閱。

這裏使用spring-integration-mqttorg.eclipse.paho.client.mqttv3兩個工具包實現。

<!--mqtt依賴包-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

3、消息發送者

消息的發送比較簡單,主要是應用到@ServiceActivator註解,需要注意messageHandler.setAsync屬性,如果設置成false,關閉異步模式發送消息時可能會阻塞。

@Configuration
public class IotMqttProducerConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttConfig.getServers());
        return factory;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
        messageHandler.setAsync(false);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }
}

MQTT 對外提供發送消息的API時,需要使用@MessagingGateway 註解,去提供一個消息網關代理,參數defaultRequestChannel 指定發送消息綁定的channel

可以實現三種API接口,payload 為發送的消息,topic 發送消息的主題,qos 消息質量。

@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {

    // 向默認的 topic 發送消息
    void sendMessage2Mqtt(String payload);
    // 向指定的 topic 發送消息
    void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
    // 向指定的 topic 發送消息,並指定服務質量參數
    void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

4、消息訂閱

消息訂閱和我們平時用的MQ消息監聽實現思路基本相似,@ServiceActivator註解表明當前方法用於處理MQTT消息,inputChannel 參數指定了用於接收消息的channel

/**
 * @Author: xiaofu
 * @Description: 消息訂閱配置
 * @date 2020/6/8 18:24
 */
@Configuration
public class IotMqttSubscriberConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttConfig.getServers());
        return factory;
    }

    @Bean
    public MessageChannel iotMqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(iotMqttInputChannel());
        return adapter;
    }

    /**
     * @author xiaofu
     * @description 消息訂閱
     * @date 2020/6/8 18:20
     */
    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler handlerTest() {

        return message -> {
            try {
                String string = message.getPayload().toString();
                System.out.println("接收到消息:" + string);
            } catch (MessagingException ex) {
                //logger.info(ex.getMessage());
            }
        };
    }
}

六、測試消息

額~ 由於本渣渣對硬件一竅不通,為了模擬硬件的發送消息,只能藉助一下工具,其實硬件端實現MQTT協議,跟我們前邊的基本沒什麼區別,只不過換種語言嵌入到硬件中而已。

這裏選的測試工具為mqttbox,下載地址:http://workswithweb.com/mqttbox.html

1、測試消息發送

我們用先用mqttbox模擬向主題mqtt_test_topic發送消息,看後台是否能成功接收到。

看到後台成功拿到了向主題mqtt_test_topic發送的消息。

2、測試消息訂閱

mqttbox模擬訂閱主題mqtt_test_topic,在後台向主題mqtt_test_topic發送一條消息,這裏我簡單的寫了個controller調用API發送消息。

http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是後台向主題 mqtt_test_topic 發送的消息

我們看mqttbox的訂閱消息,已經成功的接收到了後台的消息,到此我們的MQTT通信環境就算搭建成功了。如果把mqttbox工具換成具體硬件設備,整個流程就是我們常說的智能家居了,其實真的沒那麼難。

七、應用注意事項

在我們實際的生產環境中遇到過的問題,這裏分享一下讓大家少踩坑。

clientId 要唯一

在客戶端connect連接的時,會有一個clientId 參數,需要每個客戶端都保持唯一的。但我們在開發測試階段clientId直接在代碼中寫死了,而且服務都是單實例部署,並沒有暴露出什麼問題。

MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());

然而在生產環境內側的時候,由於服務是多實例集群部署,結果出現了下邊的奇怪問題。同一時間內只能有一個客戶端能拿到消息,其他客戶端不但不能消費消息,而且還在不斷的掉線重連:Lost connection: 已斷開連接; retrying...

這就是由於clientId相同導致客戶端間相互競爭消費,最後將clientId獲取方式換成從發號器中拿,問題就好了,所以這個地方是需要特別注意的。

平時程序在開發環境沒問題,可偏偏到了生產環境就一大堆問題,很多都是因為服務部署方式不同導致的。所以多學習分佈式還是很有必要的。

八、其他中間件

MQTT它只是一種協議,支持MQTT協議的消息中間件產品非常多,下邊的也只是其中的一部分

  • Mosquitto
  • Eclipse Paho
  • RabbitMQ
  • Apache ActiveMQ
  • HiveMQ
  • JoramMQ
  • ThingMQ
  • VerneMQ
  • Apache Apollo
  • emqttd Xively
  • IBM Websphere
    …..

總結

我也是第一次做和硬件相關的項目,之前聽到智能家居都會覺得好高大上,但實際上手開發后發現,技術嘛萬變不離其宗,也只是換種用法而已。

雙手奉上項目 demo 的github地址 :https://github.com/chengxy-nds/springboot-rabbitmq-mqtt.git

感興趣的小夥伴可以下載跑一跑,實現起來非常的簡單。

原創不易,燃燒秀髮輸出內容,希望你能有一丟丟收穫!

整理了幾百本各類技術电子書,送給小夥伴們,關注公號回復【666】自行領取。和一些小夥伴們建了一個技術交流群,一起探討技術、分享技術資料,旨在共同學習進步,如果感興趣就加入我們吧!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

.NET Core Hangfire周期性作業調度問題

前言

四月中旬Hangfire團隊發布了1.7.11版本,在使用周期性作業調度過程中發現一個問題,這個問題應該一直未解決,故做此記錄,希望遇到的童鞋根據項目業務而避開這個問題。

周期性作業調度

我們依然是在控制台中進行測試,下載所需包請參考官方文檔,這裏不再敘述,首先我們在內存中存儲數據,如下:

var storageOpts = new MemoryStorageOptions();

GlobalConfiguration.Configuration.UseMemoryStorage(storageOpts);

using var server = new BackgroundJobServer();

RecurringJob.AddOrUpdate("job1", () => Print1(), "*/10 * * * * *", TimeZoneInfo.Local);

RecurringJob.AddOrUpdate("job2", () => Print2(), "*/10 * * * * *", TimeZoneInfo.Local);

RecurringJob.AddOrUpdate("job3", () => Print3(), "*/10 * * * * *", TimeZoneInfo.Local);
public static void Print1()
{
    Console.WriteLine("start1");
}

public static void Print2()
{
    Console.WriteLine("start2");
}

public static void Print3()
{
    Console.WriteLine("start3");
}

Hangfire已支持秒級(1.7+)周期作業調度,如上代碼,我們每隔10秒執行上述3個作業,打印如下:

 

基於內存存儲間隔10秒執行對應作業,根據上述打印結果來看沒有問題,接下來我們使用SQLite來存儲作業數據看看,首先下載Hangfire.SQLite包,針對控制台需進行如下配置

GlobalConfiguration.Configuration.UseSQLiteStorage("Data Source=./hangfire.db;");

當我們啟動控制台時一定會拋出如下異常,其異常旨在表明需要SQLite驅動

我們去下載微軟官方針對SQLite的驅動(Microsoft.Data.Sqlite)

 接下來我們將發現對於每一個作業都會重複執行多次,如下:

猜測只會在SQLite數據庫中才會存在問題吧,為了解決這個問題,做了一點點嘗試,但還是無法從根本上完全解決,我們知道Hangfire服務的默認工作數量為當前機器的處理器數量乘以5即(Environment.ProcessorCount * 5),那麼我們嘗試給1是不是可以規避這個問題

var options = new BackgroundJobServerOptions()
{
    WorkerCount = 1
};

using var server = new BackgroundJobServer(options);

 

上述設置后,我們可以看到貌似只執行了一次,但是這種情況還是是隨機的並不靠譜,比如多執行幾次看看,會出現如下可能情況

 

沒招了,找了下官方issue列表,發現此問題(https://github.com/mobydi/Hangfire.Sqlite/issues/2)一直處於打開狀態並未得到解決,所以要麼看看能否根據項目業務規避這個問題或者下載源碼自行調試解決

總結

本文是在使用Hangfire過程中發現SQLite數據庫出現的問題,因針對Hangfire的SQLite具體實現並不是官方團隊所提供,所以暫不能確定到底是Hangfire.SQLite包提供者的問題,根據issue描述大概率是Hangfire的一個bug,希望在SQLite存儲作業等數據存在的問題引起使用者注意。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

C#多線程編程(一)進程與線程

一、 進程

        簡單來說,進程是對資源的抽象,是資源的容器,在傳統操作系統中,進程是資源分配的基本單位,而且是執行的基本單位,進程支持併發執行,因為每個進程有獨立的數據,獨立的堆棧空間。一個程序想要併發執行,開多個進程即可。

Q1:在單核下,進程之間如何同時執行?

        首先要區分兩個概念——併發和并行

  • 併發:併發是指在一段微小的時間段中,有多個程序代碼段被CPU執行,宏觀上表現出來就是多個程序能”同時“執行。
  • 并行:并行是指在一個時間點,有多個程序段代碼被CPU執行,它才是真正的同時執行。

        所以應該說進程之間是併發執行。對於CPU來講,它不知道進程的存在,CPU主要與寄存器打交道。有一些常用的寄存器,如程序計數器寄存器,這個寄存器存儲了將要執行的指令的地址,這個寄存器的地址指向哪,CPU就去哪。還有一些堆棧寄存器和通用寄存器等等等,總之,這些數據構成了一個程序的執行環境,這個執行環境就叫做”上下文(Context)“,進程之間切換本質就是保存這些數據到內存,術語叫做”保存現場“,然後恢復某個進程的執行環境,也即是”恢復現場“,整個過程術語叫做“上下文切換”,具體點就是進程上下文切換,這就是進程之間能併發執行的本質——頻繁的切換進程上下文。這個功能是由操作系統提供的,是內核態的,對應用軟件開發人員透明。

二、 線程

        進程雖然支持併發,但是對併發不是很友好,不友好是指每開啟一個進程,都要重新分配一部分資源,而線程相對進程來說,創建線程的代價比創建進程要小,所以引入線程能更好的提高併發性。在現代操作系統中,進程變成了資源分配的基本單位,而線程變成了執行的基本單位,每個線程都有獨立的堆棧空間,同一個進程的所有線程共享代碼段和地址空間等共享資源。相應的上下文切換從進程上下文切換變成了線程上下文切換

三、 為什麼要引入進程和線程

  1. 提高CPU利用率,在早期的單道批處理系統中,如果執行中的代碼需要依賴與外部條件,將會導致CPU空閑,例如文件讀取,等待鍵盤信號輸入,這將浪費大量的CPU時間。引入多進程和線程可以解決CPU利用率低這個問題。
  2. 隔離程序之間的數據(每個進程都有單獨的地址空間),保證系統運行的穩定性。
  3. 提高系統的響應性和交互能力。

四、 在C#中創建託管線程

1. Thread類

在.NET中,託管線程分為:

  • 前台線程
  • 後台線程

一個.Net程序中,至少要有一個前台線程,所有前台線程結束了,所有的後台線程將會被公共語言運行時(CLR)強制銷毀,程序執行結束。

如下將在控制台程序中創建一個後台線程

 1 static void Main(string[] args)
 2 {
 3      var t = new Thread(() =>
 4      {
 5          Thread.Sleep(1000);
 6          Console.WriteLine("執行完畢");
 7      });
 8     t.IsBackground = true;
 9      t.Start();
10 }

View Code

 

主線程(默認是前台線程)執行完畢,程序直接退出。

但IsBackground 屬性改為false時,控制台會打印“執行完畢”。

2. 有什麼問題

直接使用Thread類來進行多線程編程浪費資源(服務器端更加明顯)且不方便,舉個栗子。

假如我寫一個Web服務器程序,每個請求創建一個線程,那麼每一次我都要new一個Thread對象,然後傳入處理HttpRequest的委託,處理完之後,線程將會被銷毀,這將會導致浪費大量CPU時間和內存,在早期CPU性能不行和內存資源珍貴的情況下這個缺點會被放大,在現在這個缺點不是很明顯,原因是硬件上來了。

不方便體現在哪呢?

  • 無法直接獲取另一個線程內未被捕捉的異常
  • 無法直接獲取線程函數的返回值

 

 1 public static void ThrowException()
 2 {
 3      throw new Exception("發生異常");
 4 }
 5 static void Main(string[] args)
 6 {
 7      var t = new Thread(() =>
 8      {
 9          Thread.Sleep(1000);
10          ThrowException();
11      });
12     t.IsBackground = false;
13      try
14      {
15          t.Start();
16      }
17      catch(Exception e)
18      {
19          Console.WriteLine(e.Message);
20      }
21 }

View Code

 

上述代碼將會導致程序奔潰,如下圖。

 

要想直接獲取返回值和可以直接從主線程捕捉線程函數內未捕捉的異常,我們可以這麼做。

新建一個MyTask.cs文件,內容如下

 1 using System;
 2 using System.Threading;
 3 namespace ConsoleApp1
 4 {
 5      public class MyTask
 6      {
 7          private Thread _thread;
 8          private Action _action;
 9          private Exception _innerException;
10         public MyTask()
11          {
12         }
13          public MyTask(Action action)
14          {
15              _action = action;
16          }
17          protected virtual void Excute()
18          {
19              try
20              {
21                  _action();
22              }
23              catch(Exception e)
24              {
25                  _innerException = e;
26              }
27       
28          }
29          public void Start()
30          {
31              if (_thread != null) throw new InvalidOperationException("任務已經開始");
32              _thread = new Thread(() => Excute());
33              _thread.Start();
34          }
35          public void Start(Action action)
36          {
37              _action = action;
38              if (_thread != null) throw new InvalidOperationException("任務已經開始");
39              _thread = new Thread(() => Excute());
40              _thread.Start();
41          }
42         public void Wait()
43          {
44              _thread.Join();
45              if (_innerException != null) throw _innerException;
46          }
47      }
48     public class MyTask<T> : MyTask
49      {
50          private Func<T> _func { get; }
51          private T _result;
52          public T Result {
53              
54              private set => _result = value;
55              get 
56              {
57                  base.Wait();
58                  return _result;
59              }
60          }
61          public MyTask(Func<T> func)
62          {
63              _func = func;
64          }
65         public new void Start() 
66          {
67              base.Start(() =>
68              {
69                  Result = _func();
70              });
71          }
72     }
73 }

View Code

 

簡單的包裝了一下(不要在意細節),我們便可以實現我們想要的效果。

測試代碼如下

 1 public static void ThrowException()
 2 {
 3      throw new Exception("發生異常");
 4 }
 5 public static void Test3()
 6 {
 7      MyTask<string> myTask = new MyTask<string>(() =>
 8      {
 9          Thread.Sleep(1000);
10          return "執行完畢";
11      });
12     myTask.Start();
13     try
14      {
15          Console.WriteLine(myTask.Result);
16      }
17      catch (Exception e)
18      {
19          Console.WriteLine(e.Message);
20      }
21 }
22 public static void Test2()
23 {
24      MyTask<string> myTask = new MyTask<string>(() =>
25      {
26          Thread.Sleep(1000);
27          ThrowException();
28          return "執行完畢";
29      });
30     myTask.Start();
31     try
32      {
33          Console.WriteLine(myTask.Result);
34      }
35      catch(Exception e)
36      {
37          Console.WriteLine(e.Message);
38      }
39 }
40 public static void Test1()
41 {
42      MyTask myTask = new MyTask(() =>
43      {
44          Thread.Sleep(1000);
45          ThrowException();
46      });
47      myTask.Start();
48     try
49      {
50          myTask.Wait();
51      }
52      catch (Exception e)
53      {
54          Console.WriteLine(e.Message);
55      }
56 }
57 static void Main(string[] args)
58 {
59      Test1();
60      Test2();
61      Test3();
62 }

 

可以看到,我們可以通過簡單包裝Thread對象,便可實現如下效果

  • 直接讀取線程函數返回值
  • 直接捕捉線程函數未捕捉的異常(前提是調用了Wait()函數或者Result屬性)

這是理解和運用Task的基礎,Task功能非常完善,但是運用好Task需要掌握許多概念,下面再說。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

原來你是這樣的BERT,i了i了! —— 超詳細BERT介紹(一)BERT主模型的結構及其組件

原來你是這樣的BERT,i了i了! —— 超詳細BERT介紹(一)BERT主模型的結構及其組件

BERTBidirectional Encoder Representations from Transformers)是谷歌在2018年10月推出的深度語言表示模型。

一經推出便席捲整個NLP領域,帶來了革命性的進步。
從此,無數英雄好漢競相投身於這場追劇(芝麻街)運動。
只聽得這邊G家110億,那邊M家又1750億,真是好不熱鬧!

然而大家真的了解BERT的具體構造,以及使用細節嗎?
本文就帶大家來細品一下。

前言

本系列文章分成三篇介紹BERT,本文主要介紹BERT主模型(BertModel)的結構及其組件相關知識,另有兩篇分別介紹BERT預訓練相關和如何將BERT應用到不同的下游任務

文章中的一些縮寫:NLP(natural language processing)自然語言處理;CV(computer vision)計算機視覺;DL(deep learning)深度學習;NLP&DL 自然語言處理和深度學習的交叉領域;CV&DL 計算機視覺和深度學習的交叉領域。

文章公式中的向量均為行向量,矩陣或張量的形狀均按照PyTorch的方式描述。
向量、矩陣或張量后的括號表示其形狀。

本系列文章的代碼均是基於transformers庫(v2.11.0)的代碼(基於Python語言、PyTorch框架)。
為便於理解,簡化了原代碼中不必要的部分,並保持主要功能等價。
在代碼最開始的地方,需要導入以下包:

代碼

from math import inf, sqrt
import torch as tc
from torch import nn
from torch.nn import functional as F
from transformers import PreTrainedModel

閱讀本系列文章需要一些背景知識,包括Word2VecLSTMTransformer-BaseELMoGPT等,由於本文不想過於冗長(其實是懶),以及相信來看本文的讀者們也都是衝著BERT來的,所以這部分內容還請讀者們自行學習。
本文假設讀者們均已有相關背景知識。

目錄

  • 1、主模型
    • 1.1、輸入
    • 1.2、嵌入層
      • 1.2.1、嵌入變換
      • 1.2.2、層標準化
      • 1.2.3、隨機失活
    • 1.3、編碼器
      • 1.3.1、隱藏層
        • 1.3.1.1、線性變換
        • 1.3.1.2、激活函數
          • 1.3.1.2.1、tanh
          • 1.3.1.2.2、softmax
          • 1.3.1.2.3、GELU
        • 1.3.1.3、多頭自注意力
        • 1.3.1.4、跳躍連接
    • 1.4、池化層
    • 1.5、輸出

1、主模型

BERT的主模型是BERT中最重要組件,BERT通過預訓練(pre-training),具體來說,就是在主模型后再接個專門的模塊計算預訓練的損失(loss),預訓練后就得到了主模型的參數(parameter),當應用到下游任務時,就在主模型後接個跟下游任務配套的模塊,然後主模型賦上預訓練的參數,下游任務模塊隨機初始化,然後微調(fine-tuning)就可以了(注意:微調的時候,主模型和下游任務模塊兩部分的參數一般都要調整,也可以凍結一部分,調整另一部分)。

主模型由三部分構成:嵌入層編碼器池化層
如圖:

其中

  • 輸入:一個個小批(mini-batch),小批里是batch_size個序列(句子或句子對),每個序列由若干個離散編碼向量組成。
  • 嵌入層:將輸入的序列轉換成連續分佈式表示(distributed representation),即詞嵌入(word embedding)或詞向量(word vector)。
  • 編碼器:對每個序列進行非線性表示。
  • 池化層:取出[CLS]標記(token)的表示(representation)作為整個序列的表示。
  • 輸出:編碼器最後一層輸出的表示(序列中每個標記的表示)和池化層輸出的表示(序列整體的表示)。

下面具體介紹這些部分。

1.1、輸入

一般來說,輸入BERT的可以是一句話:

I'm repairing immortals.

也可以是兩句話:

I'm repairing immortals. ||| Me too.

其中|||是分隔兩個句子的分隔符。

BERT先用專門的標記器(tokenizer)來標記(tokenize)序列,雙句標記后如下(單句類似):

I ' m repair ##ing immortal ##s . ||| Me too .

標記器其實就是先對句子進行基於規則的標記化(tokenization),這一步可以把'm以及句號.等分割開,再進行子詞分割(subword segmentation),示例中帶##的就是被子詞分割開的部分。
子詞分割有很多好處,比如壓縮詞彙表、表示未登錄詞(out of vocabulary words, OOV words)、表示單詞內部結構信息等,以後有時間專門寫一篇介紹這個。

數據集中的句子長度不一定相等,BERT採用固定輸入序列(長則截斷,短則填充)的方式來解決這個問題。
首先需要設定一個seq_length超參數(hyperparameter),然後判斷整個序列長度是否超出,如果超出:單句截掉最後超出的部分,雙句則先刪掉較長的那句話的末尾標記,如果兩句話長度相等,則輪流刪掉兩句話末尾的標記,直到總長度達到要求(即等長的兩句話刪掉的標記數量盡量相等);如果序列長度過小,則在句子最後添加[PAD]標記,使長度達到要求。

然後在序列最開始添加[CLS]標記,以及在每句話末尾添加[SEP]標記。
單句話添加一個[CLS]和一個[SEP],雙句話添加一個[CLS]和兩個[SEP]
[CLS]標記對應的表示作為整個序列的表示,[SEP]標記是專門用來分隔句子的。
注意:處理長度時需要考慮添加的[CLS][SEP]標記,使得最終總的長度=seq_length[PAD]標記在整個序列的最末尾。

例如seq_length=12,則單句變為:

[CLS] I ' m repair ##ing immortal ##s . [SEP] [PAD] [PAD]

如果seq_length=10,則雙句變為:

[CLS] I ' m repair [SEP] Me too . [SEP]

分割完后,每一個空格分割的子字符串(substring)都看成一個標記(token),標記器通過查表將這些標記映射成整數編碼。
單句如下:

[101, 146, 112, 182, 6949, 1158, 15642, 1116, 119, 102, 0, 0]

最後整個序列由四種類型的編碼向量表示,單句如下:

標記編碼:[101, 146, 112, 182, 6949, 1158, 15642, 1116, 119, 102, 0, 0]
位置編碼:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
句子位置編碼:[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
注意力掩碼:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0]

其中,標記編碼就是上面的序列中每個標記轉成編碼后得到的向量;位置編碼記錄每個標記的位置;句子位置編碼記錄每個標記屬於哪句話,0是第一句話,1是第二句話(注意:[CLS]標記對應的是0);注意力掩碼記錄某個標記是否是填充的,1表示非填充,0表示填充。

雙句如下:

標記編碼:[101, 146, 112, 182, 6949, 102, 2508, 1315, 119, 102]
位置編碼:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
句子位置編碼:[0, 0, 0, 0, 0, 0, 1, 1, 1, 1]
注意力掩碼:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

上面的是英文的情況,中文的話BERT直接用漢字級別表示,即

我在修仙( ̄︶ ̄)↗

這樣的句子分割成

我 在 修 仙 (  ̄ ︶  ̄ ) ↗

然後每個漢字(包括中文標點)看成一個標記,應用上述操作即可。

1.2、嵌入層

嵌入層的作用是將序列的離散編碼錶示轉換成連續分佈式表示。
離散編碼只能表示A和B相等或不等,但是如果將其表示成連續分佈式表示(即連續的N維空間向量),就可以計算\(A\)\(B\)之間的相似度或距離了,從而表達更多信息。
這個是詞嵌入或詞向量的知識,可以參考Word2Vec相關內容,本文不再贅述了。

嵌入層包含三種組件:嵌入變換(embedding)、層標準化(layer normalization)、隨機失活(dropout)。
如圖:

1.2.1、嵌入變換

嵌入變換實際上就是一個線性變換(linear transformation)。
傳統上,離散標記往往表示成一個獨熱碼(one-hot)向量,也叫標準基向量,即一個長度為\(V\)的向量,其中只有一位為\(1\),其他都為\(0\)
在NLP&DL領域,\(V\)一般是詞彙表的大小。
但是這種向量往往維數很高(詞彙表往往比較大)而且很稀疏(每個向量只有一位不為\(0\)),不好處理。
所以可以通過一個線性變換將這個向量轉換成低維稠密的向量。

假設\(v\)\(V\))是標記\(t\)的獨熱碼向量,\(W\)\(V \times H\))是一個\(V\)\(H\)列的矩陣,則\(t\)的嵌入\(e\)為:

\[e = v W \]

實際上\(W\)中每一行都可以看成一個詞嵌入,而這個矩陣乘就是把\(v\)中等於\(1\)的那個位置對應的\(W\)中的詞嵌入取出來。
在工程實踐中,由於獨熱碼向量比較占內存,而且矩陣乘效率也不高,所以往往用一個整數編碼來代替獨熱碼向量,然後直接用查表的方式取出對應的詞嵌入。

所以假設\(n\)\(t\)的編碼,一般是在詞彙表中的編號,那麼上面的公式就可以改成:

\[e = W_{n} \]

其中下標表示取出對應的行。

那麼一個標記化后的序列就可以表示成一個編碼向量。
假設序列\(T\)的編碼向量為\(s\)\(L\)),\(L\)為序列的長度,即\(T\)中有\(L\)個標記。
如果詞嵌入長度為\(H\),那麼經過嵌入變換,得到\(T\)的隱狀態(hidden state)\(h\)\(L \times H\))。

1.2.2、層標準化

層標準化類似於批標準化(batch normalization),可以加速模型訓練,但其實現方式和批標準化不一樣,層標準化是沿着詞嵌入(通道)維進行標準化的,不需要在訓練時存儲統計量來估計整體數據集的均值和方差,訓練(training)和評估(evaluation)或推理(inference)階段的操作是相同的。
另外批標準化對小批大小有限制,而層標準化則沒有限制。

假設輸入的一個詞嵌入為\(e = [x_0, x_1, …, x_{H-1}]\)\(x_k\)\(e\)\(k = 0, 1, …, (H-1)\) 維的分量,\(H\)是詞嵌入長度。
那麼層標準化就是

\[y_{k} = \frac{x_{k}-\mu}{\sigma} * \alpha_k + \beta_k \]

其中,\(y_{k}\)是輸出,\(\mu\)\(\sigma^2\)分別是均值和方差:

\[ \mu = \frac{1}{H} \sum_{k=0}^{H-1} x_{k} \\ \sigma^2 = \frac{1}{H} \sum_{k=0}^{H-1} (x_{k}-\mu)^2 \\ \]

\(\alpha_k\)\(\beta_k\)是學習得到的參數,用於防止模型表示能力退化。

注意:\(\mu\)\(\sigma^2\)是針對每個樣本每個位置的詞嵌入分別計算的,而\(\alpha_k\)\(\beta_k\)對所有的詞嵌入都是共用的;\(\sigma^2\)的計算沒有使用貝塞爾校正(Bessel’s correction)。

1.2.3、隨機失活

隨機失活是DL領域非常著名且常用的正則化(regularization)方法(然而被谷歌註冊專利了),用來防止模型過擬合(overfitting)。

具體來說,先設置一個超參數\(P \in [0, 1]\),表示按照概率\(P\)隨機將值置\(0\)
然後假設詞嵌入中某一維分量是\(x\),按照均勻隨機分佈產生一個隨機數\(r \in [0, 1]\),然後輸出值\(y\)為:

\[ y = \left\{ \begin{aligned} & \frac{x}{1-P} &, & r > P \\ & 0 &, & r \le P \\ \end{aligned} \right. \]

由於按照概率\(P\)\(0\),相當於輸出值的期望變成原來的\((1-P)\)倍,所以再對輸出值除以\((1-P)\),就可以保持期望不變。

以上操作針對訓練階段,在評估階段,輸出值等於輸入值:

\[y = x \]

嵌入層代碼如下:

代碼

# BERT之嵌入層
class BertEmb(nn.Module):
	def __init__(self, config):
		super().__init__()
		# 標記嵌入,padding_idx=0:編碼為0的嵌入始終為零向量
		self.tok_emb = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=0)
		# 位置嵌入
		self.pos_emb = nn.Embedding(config.max_position_embeddings, config.hidden_size)
		# 句子位置嵌入
		self.sent_pos_emb = nn.Embedding(config.type_vocab_size, config.hidden_size)

		# 層標準化
		self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
		# 隨機失活
		self.dropout = nn.Dropout(config.hidden_dropout_prob)

	def forward(self,
			tok_ids,  # 標記編碼(batch_size * seq_length)
			pos_ids=None,  # 位置編碼(batch_size * seq_length)
			sent_pos_ids=None,  # 句子位置編碼(batch_size * seq_length)
	):
		device = tok_ids.device  # 設備(CPU或CUDA)
		shape = tok_ids.shape  # 形狀(batch_size * seq_length)
		seq_length = shape[1]

		# 默認:[0, 1, ..., seq_length-1]
		if pos_ids is None:
			pos_ids = tc.arange(seq_length, dtype=tc.int64, device=device)
			pos_ids = pos_ids.unsqueeze(0).expand(shape)
		# 默認:[0, 0, ..., 0],即所有標記都屬於第一個句子
		if sent_pos_ids is None:
			sent_pos_ids = tc.zeros(shape, dtype=tc.int64, device=device)

		# 三種嵌入(batch_size * seq_length * hidden_size)
		tok_embs = self.tok_emb(tok_ids)
		pos_embs = self.pos_emb(pos_ids)
		sent_pos_embs = self.sent_pos_emb(sent_pos_ids)

		# 三種嵌入相加
		embs = tok_embs + pos_embs + sent_pos_embs
		# 層標準化嵌入
		embs = self.layer_norm(embs)
		# 隨機失活嵌入
		embs = self.dropout(embs)
		return embs  # 嵌入(batch_size * seq_length * hidden_size)

其中,
config是BERT的配置文件對象,裏面記錄了各種預先設定的超參數;
vocab_size是詞彙表大小;
hidden_size是詞嵌入長度,默認是768(bert-base-*)或1024(bert-large-*);
max_position_embeddings是允許的最大標記位置,默認是512;
type_vocab_size是允許的最大句子位置,即最多能輸入的句子數量,默認是2;
layer_norm_eps是一個>0並很接近0的小數\(\epsilon\),用來防止計算時發生除0等異常操作;
hidden_dropout_prob是隨機失活概率,默認是0.1;
batch_size是小批的大小,即一個小批里的樣本個數;
seq_length是輸入的編碼向量的長度。

1.3、編碼器

編碼器的作用是對嵌入層輸出的隱狀態進行非線性表示,提取出其中的特徵(feature),它是由num_hidden_layers個結構相同(超參數相同)但參數不同(不共享參數)的隱藏層串連構成的。
如圖:

1.3.1、隱藏層

隱藏層包括線性變換、激活函數(activation function)、多頭自注意力(multi-head self-attention)、跳躍連接(skip connection),以及上面介紹過的層標準化和隨機失活。
如圖:

其中,激活函數默認是GELU,線性變換均是逐位置線性變換,即對不同樣本不同位置的詞嵌入應用相同的線性變換(類似於CV&DL領域的\(1 \times 1\)卷積)。

1.3.1.1、線性變換

線性變換在CV&DL領域也叫全連接層(fully connected layer),即

\[y = x W^T + b \]

其中,\(x\)\(A\))是輸入向量,\(y\)\(B\))是輸出向量,\(W\)\(B \times A\))是權重(weight)矩陣,\(b\)\(B\))是偏置(bias)向量;\(W\)\(b\)是學習得到的參數。

另外,嚴格來說,當\(b = \vec 0\)時,上式為線性變換;當\(b \ne \vec 0\)時,上式為仿射變換(affine transformation)。
但是在DL中,人們往往並不那麼摳字眼,對於這兩種變換,一般都簡單地稱為線性變換。

1.3.1.2、激活函數

激活函數在DL中非常關鍵!
因為如果要提高一個神經網絡(neural network)的表示能力,往往需要加深網絡的深度。
然而如果只疊加多個線性變換的話,這等價於一個線性變換(大家可以推推看)!
所以只有在線性變換後接一個非線性變換(nonlinear transformation),即激活函數,才能逐漸加深網絡並提高表示能力。

激活函數有很多,常見的包括sigmoidtanhsoftmaxReLUGELUSwishMish等。
本文只講和BERT相關的激活函數:tanh、softmax、GELU。

1.3.1.2.1、tanh

激活函數的一個功能是調整輸入值的取值範圍。
tanh即雙曲正切函數,可以將\((-\infty, +\infty)\)的數映射到\((-1, 1)\),並且嚴格單調。
函數圖像如圖:

tanh在NLP&DL領域用得比較多。

1.3.1.2.2、softmax

softmax顧名思義,它可以對輸入的一組數值根據其大小給出每個數值的概率,數值越大,概率越高,且概率求和為\(1\)

假設輸入\(x_k\)\(k = 0, 1, …, (N-1)\),則輸出值\(y_k\)為:

\[y_k = \frac{exp(x_k)}{\sum_{i=0}^{N-1} exp(x_i)} \]

實際上,對於任意一個對數幾率(logit)\(x \in (-\infty, +\infty)\)\(x\)越大,表示某個事件發生的可能性越大,softmax可以將其轉化為概率,即將取值範圍映射到\((0, 1)\)

1.3.1.2.3、GELU

GELUGaussian Error Linear Units)是2016年6月提出的一個激活函數。
GELU相比ReLU曲線更為光滑,允許梯度更好地傳播。
GELU的想法類似於隨機失活,隨機失活是按照0-1分佈,又叫兩點分佈,也叫伯努利分佈(Bernoulli distribution),隨機通過輸入值;而GELU則是將這個概率分佈改成正態分佈(Normal distribution),也叫高斯分佈(Gaussian distribution),然後輸出期望。

假設輸入值是\(x\),輸出值是\(y\),那麼GELU就是:

\[y = x P(X \le x) \]

其中,\(X \sim \mathcal{N}(0, 1)\)\(P\)為概率。

GELU的函數圖像如圖:

其中藍線為ReLU函數圖像,橙線為GELU函數圖像。

1.3.1.3、多頭自注意力

多頭自注意力是Transformer的一大特色。
多頭自注意力的名字可以分成三個詞:多頭、自、注意力:

  • 注意力:是DL領域近年來最重要的創新之一!可以使模型以不同的方式對待不同的輸入(即分配不同的權重),而無視空間(即輸入向量排成線形、面形、樹形、圖形等拓撲結構)的形狀、大小、距離。
  • 自:是在普通的注意力基礎上修改而來的,可以表示輸入與自身的依賴關係。
  • 多頭:是對注意力中涉及的向量分別拆分計算,從而提高表示能力。

對於一般的多頭注意力,假設計算\(x\)\(H\))對\(y_i\)\(H\)),\(i = 0, 1, …, (L-1)\),的多頭注意力,則首先計算\(q\)(H)、\(k_i\)(H)、\(v_i\)(H):

\[ q = x W_q^T + b_q \\ k_i = y_i W_k^T + b_k \\ v_i = y_i W_v^T + b_v \\ \]

其中,\(W_z\)\(H \times H\))和\(b_z\)\(H\))分別為權重矩陣和偏置向量,\(z \in \{ q, k, v \}\)
然後將這三種向量等長度拆分成\(S\)個向量,稱為頭向量:

\[ q_j = [q_0; q_1; …; q_{S-1}] \\ k_{ij} = [k_{i0}; k_{i1}; …; k_{i, S-1}] \\ v_{ij} = [v_{i0}; v_{i1}; …; v_{i, S-1}] \\ \]

上式中的分號為串連操作,即把多個向量拼接起來組成一個更長的向量。
其中,每個頭向量長度都為\(D\),且\(S \times D = H\)

然後計算\(q_j\)\(k_{ij}\)的注意力分數\(s_{ij}\)

\[s_{ij} = \frac{q_j k_{ij}^T}{\sqrt{D}} \]

之後可以添加註意力掩碼(也可以不加),即令\(s_{mj} = -\infty\)\(m\)是需要添加掩碼的位置。
然後通過softmax計算注意力概率\(p_{ij}\)

\[p_{ij} = \frac{exp(s_{ij})}{\sum_{t=0}^{L-1} exp(s_{tj})} \]

之後對注意力概率進行隨機失活:

\[\hat{p}_{ij} = dropout(p_{ij}) \]

再之後計算輸出向量\(r_j\)\(D\)):

\[r_j = \sum_{i=0}^{L-1} \hat{p}_{ij} v_{ij} \]

最終的輸出向量是把每一頭的輸出向量串連起來:

\[r = [r_0; r_1; …; r_{S-1}] \]

其中\(r\)\(H\))為最終的輸出向量。

如果令\(x = y_n\)\(n \in \{ 0, 1, …, L-1 \}\),即\(x\)\(y_i\)中的某一個向量,那麼多頭注意力就變為多頭自注意力。

代碼如下:

代碼

# BERT之多頭自注意力
class BertMultiHeadSelfAtt(nn.Module):
	def __init__(self, config):
		super().__init__()
		# 注意力頭數
		self.num_heads = config.num_attention_heads
		# 注意力頭向量長度
		self.head_size = config.hidden_size // config.num_attention_heads

		self.query = nn.Linear(config.hidden_size, config.hidden_size)
		self.key = nn.Linear(config.hidden_size, config.hidden_size)
		self.value = nn.Linear(config.hidden_size, config.hidden_size)

		self.dropout = nn.Dropout(config.attention_probs_dropout_prob)

	# 輸入(batch_size * seq_length * hidden_size)
	# 輸出(batch_size * num_heads * seq_length * head_size)
	def shape(self, x):
		shape = (*x.shape[:2], self.num_heads, self.head_size)
		return x.view(*shape).transpose(1, 2)
	# 輸入(batch_size * num_heads * seq_length * head_size)
	# 輸出(batch_size * seq_length * hidden_size)
	def unshape(self, x):
		x = x.transpose(1, 2).contiguous()
		return x.view(*x.shape[:2], -1)

	def forward(self,
			inputs,  # 輸入(batch_size * seq_length * hidden_size)
			att_masks=None,  # 注意力掩碼(batch_size * seq_length * hidden_size)
	):
		mixed_querys = self.query(inputs)
		mixed_keys = self.key(inputs)
		mixed_values = self.value(inputs)

		querys = self.shape(mixed_querys)
		keys = self.shape(mixed_keys)
		values = self.shape(mixed_values)

		# 注意力分數(batch_size * num_heads * seq_length * seq_length)
		att_scores = querys.matmul(keys.transpose(2, 3))
		# 縮放注意力分數
		att_scores = att_scores / sqrt(self.head_size)
		# 添加註意力掩碼
		if att_masks is not None:
			att_scores = att_scores + att_masks

		# 注意力概率(batch_size * num_heads * seq_length * seq_length)
		att_probs = att_scores.softmax(dim=-1)
		# 隨機失活注意力概率
		att_probs = self.dropout(att_probs)

		# 輸出(batch_size * num_heads * seq_length * head_size)
		outputs = att_probs.matmul(values)
		outputs = self.unshape(outputs)
		return outputs  # 輸出(batch_size * seq_length * hidden_size)

其中,
num_attention_heads是注意力頭數,默認是12(bert-base-*)或16(bert-large-*);
attention_probs_dropout_prob是注意力概率的隨機失活概率,默認是0.1。

1.3.1.4、跳躍連接

跳躍連接也是DL領域近年來最重要的創新之一!
跳躍連接也叫殘差連接(residual connection)。
一般來說,傳統的神經網絡往往是一層接一層串連而成,前一層輸出作為後一層輸入。
而跳躍連接則是某一層的輸出,跳過若干層,直接輸入某個更深的層。
例如BERT的每個隱藏層中有兩個跳躍連接。

跳躍連接的作用是防止神經網絡梯度消失或梯度爆炸,使損失曲面(loss surface)更平滑,從而使模型更容易訓練,使神經網絡可以設置得更深。

按我個人的理解,一般來說,線性變換是最能保持輸入信息的,而非線性變換則往往會損失一部分信息,但是為了網絡的表示能力不得不線性變換與非線性變換多次堆疊,這樣網絡深層接收到的信息與最初輸入的信息比可能已經面目全非,而跳躍連接則可以讓輸入信息原汁原味地傳播得更深。

隱藏層代碼如下:

代碼

# BERT之隱藏層
class BertLayer(nn.Module):
	# noinspection PyUnresolvedReferences
	def __init__(self, config):
		super().__init__()
		# 多頭自注意力
		self.multi_head_self_att = BertMultiHeadSelfAtt(config)

		self.linear = nn.Linear(config.hidden_size, config.hidden_size)
		self.dropout = nn.Dropout(config.hidden_dropout_prob)
		self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)

		# 升維線性變換
		self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
		# 激活函數,默認:GELU
		self.act_fct = F.gelu

		# 降維線性變換,使向量大小保持不變
		self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
		self.dropout_1 = nn.Dropout(config.hidden_dropout_prob)
		self.layer_norm_1 = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
	def forward(self,
			inputs,  # 輸入(batch_size * seq_length * hidden_size)
			att_masks=None,  # 注意力掩碼(batch_size * seq_length * hidden_size)
	):
		outputs = self.multi_head_self_att(inputs, att_masks=att_masks)
		outputs = self.linear(outputs)
		outputs = self.dropout(outputs)
		att_outputs = self.layer_norm(outputs + inputs)  # 跳躍連接

		outputs = self.linear_1(att_outputs)
		outputs = self.act_fct(outputs)

		outputs = self.linear_2(outputs)
		outputs = self.dropout_1(outputs)
		outputs = self.layer_norm_1(outputs + att_outputs)  # 跳躍連接
		return outputs  # 輸出(batch_size * seq_length * hidden_size)

其中,
intermediate_size是中間一個升維線性變換升維后的長度,默認是3072(bert-base-*)或4096(bert-large-*)。

編碼器代碼如下:

代碼

# BERT之編碼器
class BertEnc(nn.Module):
	def __init__(self, config):
		super().__init__()
		# num_hidden_layers個隱藏層
		self.layers = nn.ModuleList([BertLayer(config)
			for _ in range(config.num_hidden_layers)])
	# noinspection PyTypeChecker
	def forward(self,
			inputs,  # 輸入(batch_size * seq_length * hidden_size)
			att_masks=None,  # 注意力掩碼(batch_size * seq_length)
	):
		# 調整注意力掩碼的值和形狀
		if att_masks is not None:
			device = inputs.device  # 設備(CPU或CUDA)
			dtype = inputs.dtype  # 數據類型(float16、float32或float64)
			shape = att_masks.shape  # 形狀(batch_size * seq_length)
			t = tc.zeros(shape, dtype=dtype, device=device)
			t[att_masks<=0] = -inf  # exp(-inf) = 0
			t = t[:, None, None, :]
			att_masks = t

		outputs = inputs
		for layer in self.layers:
			outputs = layer(outputs, att_masks=att_masks)
		return outputs  # 輸出(batch_size * seq_length * hidden_size)

其中,
num_hidden_layers是隱藏層數量,默認是12(bert-base-*)或24(bert-large-*)。

1.4、池化層

池化層是將[CLS]標記對應的表示取出來,並做一定的變換,作為整個序列的表示並返回,以及原封不動地返回所有的標記表示。
如圖:

其中,激活函數默認是tanh。

池化層代碼如下:

代碼

# BERT之池化層
class BertPool(nn.Module):
	def __init__(self, config):
		super().__init__()
		self.linear = nn.Linear(config.hidden_size, config.hidden_size)
		self.act_fct = F.tanh
	def forward(self,
			inputs,  # 輸入(batch_size * seq_length * hidden_size)
	):
		# 取[CLS]標記的表示
		outputs = inputs[:, 0]
		outputs = self.linear(outputs)
		outputs = self.act_fct(outputs)
		return outputs  # 輸出(batch_size * hidden_size)

1.5、輸出

主模型最後輸出所有的標記表示和整體的序列表示,分別用於針對每個標記的預測任務和針對整個序列的預測任務。

主模型代碼如下:

代碼

# BERT之預訓練模型抽象基類
class BertPreTrainedModel(PreTrainedModel):
	from transformers import BertConfig
	from transformers import BERT_PRETRAINED_MODEL_ARCHIVE_MAP
	from transformers import load_tf_weights_in_bert

	config_class = BertConfig
	pretrained_model_archive_map = BERT_PRETRAINED_MODEL_ARCHIVE_MAP
	load_tf_weights = load_tf_weights_in_bert
	base_model_prefix = 'bert'

	# 注意力頭剪枝
	def _prune_heads(self, heads_to_prune):
		pass
	# 參數初始化
	def _init_weights(self, module):
		config = self.config
		f = lambda x: x is not None and x.requires_grad
		if isinstance(module, nn.Embedding):
			if f(module.weight):
				# 正態分佈隨機初始化
				module.weight.data.normal_(mean=0.0, std=config.initializer_range)
		elif isinstance(module, nn.Linear):
			if f(module.weight):
				# 正態分佈隨機初始化
				module.weight.data.normal_(mean=0.0, std=config.initializer_range)
			if f(module.bias):
				# 初始為0
				module.bias.data.zero_()
		elif isinstance(module, nn.LayerNorm):
			if f(module.weight):
				# 初始為1
				module.weight.data.fill_(1.0)
			if f(module.bias):
				# 初始為0
				module.bias.data.zero_()
# BERT之主模型
class BertModel(BertPreTrainedModel):
	def __init__(self, config):
		super().__init__(config)
		self.config = config
		# 嵌入層
		self.emb = BertEmb(config)
		# 編碼器
		self.enc = BertEnc(config)
		# 池化層
		self.pool = BertPool(config)
		# 參數初始化
		self.init_weights()

	# noinspection PyUnresolvedReferences
	def get_input_embeddings(self):
		return self.emb.tok_emb
	def set_input_embeddings(self, embs):
		self.emb.tok_emb = embs

	def forward(self,
			tok_ids,  # 標記編碼(batch_size * seq_length)
			pos_ids=None,  # 位置編碼(batch_size * seq_length)
			sent_pos_ids=None,  # 句子位置編碼(batch_size * seq_length)
			att_masks=None,  # 注意力掩碼(batch_size * seq_length)
	):
		outputs = self.emb(tok_ids, pos_ids=pos_ids, sent_pos_ids=sent_pos_ids)
		outputs = self.enc(outputs, att_masks=att_masks)
		pooled_outputs = self.pool(outputs)
		return (
			outputs,  # 輸出(batch_size * seq_length * hidden_size)
			pooled_outputs,  # 池化輸出(batch_size * hidden_size)
		)

其中,
BertPreTrainedModel是預訓練模型抽象基類,用於完成一些初始化工作。

後記

本文詳細地介紹了BERT主模型的結構及其組件,了解它的構造以及代碼實現對於理解以及應用BERT有非常大的幫助。
後續兩篇文章會分別介紹BERT預訓練下游任務相關。

從BERT主模型的結構中,我們可以發現,BERT拋棄了RNN架構,而只用注意力機制來抽取長距離依賴(這個其實是Transformer架構的特點)。
由於注意力可以并行計算,而RNN必須串行計算,這就使得模型計算效率大大提升,於是BERT這類模型也能夠堆得很深。
BERT為了能夠同時做單句和雙句的序列和標記的預測任務,設計了[CLS][SEP]等特殊標記分別作為序列表示以及標記不同的句子邊界,整體採用了桶狀的模型結構,即輸入時隱狀態的形狀與輸出時隱狀態的形狀相等(只是在每個隱藏層有升維與降維操作,整體上詞嵌入長度保持不變)。
由於注意力機制對距離不敏感,所以BERT額外添加了位置特徵。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

南極「頰紋企鵝」 半世紀數量減少七成七

摘錄自2020年2月12日公視報導

美國生物學家最近在南極調查發現,當地常見的一種「頰紋企鵝」,數量比50年前大幅銳減了七成七。科學家指出,南極溫度在過去2、30年間上升了5°C。而冰棚快速融解崩塌,讓企鵝失去原本的棲息地,食物供應也大受影響。

保育生物學家佛瑞斯特表示,「我們認為這裡基礎食物鏈的建構出了問題,有可能是浮游生物或是磷蝦減少,食物不如以往充足,導致企鵝數量不斷減少。問題在於這種情況會不會持續惡化。」

頰紋企鵝、皇帝企鵝和國王企鵝,數量都在急劇減少,這也意味著生態的多樣性與豐富性正遭遇巨大衝擊。環保團體呼籲聯合國在2030年之前,將全球30%的海域列為保護區,因為目前只有4%受到保護實在太少。而從目前企鵝數量銳減的情況看來,將南極海域列為保護區已經刻不容緩。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

IEA:離岸風力發電可完全滿足全球電力需求

摘錄自2019年10月26日鉅亨網報導

國際能源總署(IEA)25 日發布指出,海上風力發電可以產生足夠的電力,供應地球上每個家庭和企業,IEA 形容這份報告是歷來對海上風電「最全面的全球研究」結果,該研究針對數十萬英里的海岸線進行分析研究。

IEA 指出,雖然離岸風電在 2010 年至 2018 年高速發展,但是當今的海上風電市場甚至還沒有充分挖掘出全部潛力。

該報告指出,僅開發近岸的主要風力電場所能提供的電力就已超過當今全球消耗的電力總量。但是,海上風電所能生產的最大潛力超過 12 萬百萬瓩,是 2040 年預計全球電力需求的 11 倍,不過這個估算並未將傳輸和儲存電力的困難納入考慮範圍。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

升溫惹的禍 歐盟首次以地圖示警 世紀末英、法、荷等多處城市將泡水

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

廣東省電動汽車產業標準體系建設項目正式啟動

近日,廣東省產業標準體系及公共技術創新服務平臺建設專案在深圳啟動,加快對《廣東省電動汽車產業標準體系規劃與路線圖(2011-2015)》提出的72項缺失標準的研製,以促進廣東省電動汽車業快速發展。

此次啟動的項目將充分整合廣東全省電動汽車產業在人才、技術、設備和科研等方面的優勢,破解目前面臨的難題。同時,廣東省將建立電動汽車標準與專利資訊服務平臺,提供國內外電動汽車標準體系資料庫查詢、標準資訊服務等。到2015年,廣東全省將初步建立適應電動汽車發展要求的配套設施網路,形成電動汽車地方性標準規範,實現電動汽車在城市公交系統的規模應用,讓電動轎車走入普通家庭。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※幫你省時又省力,新北清潔一流服務好口碑

※回頭車貨運收費標準

日本研發“軟綿綿”電動汽車最高時速50公里

近日,日本廣島大學相關人員創辦的高新技術企業在廣島市內召開新車發佈會,展示了一款最新研發的,車體附有特殊海綿袋,“無論撞上多少次,車身都會恢復原樣”。

這款電動汽車名為“i SAVE YOU”。身為公司社長的廣島大學大學院教授升島努表示:“這款汽車是為老年人和主婦設計的”。該電動汽車為三輪,最高時速50公里。帳篷質地的袋子裡塞入海綿,然後覆蓋在車身四周吸收撞擊時的衝擊力。若發生碰撞時的車速為每小時約10公里,則完全不會對車身產生影響,搭乘者也不會受傷。充電1次可行駛約30公里,駕駛員須持有普通駕照。

這款汽車的研發得到了馬自達公司資深技術人員的協助、以及縣內中小企業的支持,耗時4年左右才完成。車輛售價為89萬日元和79萬日元兩種,銷售目標為一年1千輛。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

飛度混動版廣州車展正式上市售價17.98萬元

在廣州車展上,飛度版正式上市,售價17.98萬元。新車以進口的方式引入中國,依託在廣汽本田的銷售管道進行銷售。

外觀方面,混合動力飛度與普通版車型略有不同:混動車型的中網鍍鉻裝飾的面積更大,前大燈樣式有所改進,增加了藍邊,並採用了新的前進氣格柵和保險杠,尾部燈組為透明設計,後備箱下方增加鍍鉻裝飾。

車身尺寸方面與普通版飛度較為接近,長寬高分別為3900mm/1695mm/1525mm,但整備品質比普通版車型重了約60Kg,達到1070Kg。此外,飛度混動版的輪胎尺寸變為175/65R14,有利於燃油經濟性的提升。

內飾部分變化不大,第二排座椅可以折疊,儲物空間與常規動力車型相同,同時還配備了有助於降低油耗的Eco Assist節能駕駛輔助系統。

動力方面,混合動力版飛度一台1.3L四缸SOHCi-VTEC汽油引擎與一套並聯混動系統,其中汽油發動機最大功率88hp/5800rpm,峰值扭矩為121Nm,並與之匹配CVT變速箱;混合動力系統採用與本田另一款混合動力車Insight相同的IMA混合動力系統(Integrated Motor Assist,整合電機輔助系統),電動馬達最大功率14馬力,峰值扭矩78牛米,油耗僅為3.6升/百公里。

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧