Nginx 如何自定義變量?

之前的兩篇文章 Nginx 變量介紹以及利用 Nginx 變量做防盜鏈 講的是 Nginx 有哪些變量以及一個常見的應用。那麼如此靈活的 Nginx 怎麼能不支持自定義變量呢,今天的文章就來說一下自定義變量的幾個模塊以及 Nginx 的 keepalive 特性。

通過映射新變量提供更多的可能性:map 模塊

  • 功能:基於已有變量,使用類似 switch {case: … default: …} 的語法創建新變量,為其他基於變量值實現功能的模塊提供更多的可能性
  • 模塊:ngx_http_map_module 默認編譯進 Nginx,通過 --without-http_map_module 禁用

指令

Syntax: map string $variable { ... }
Default: —
Context: http

Syntax: map_hash_bucket_size size;
Default: map_hash_bucket_size 32|64|128; 
Context: http

Syntax: map_hash_max_size size;
Default: map_hash_max_size 2048; 
Context: http

我們主要看一下 map string $variable { ... } 這個指令。所謂類似 switch case 的語法是指,string 的值可以有多個,可以根據 string 值的不同,來給 $variable 賦不同的值。

規則

  • 已有變量:string 需要是已有的變量,可以分為下面這三種情況
    • 字符串
    • 一個或者多個變量
    • 變量與字符串的組合
  • case 規則:{…} 內的匹配規則需要遵循以下規則,尤其是要注意當使用 hostnames 指令時,與 server name 的匹配規則是一致的,可以看之前的文章 Nginx 的配置指令
    • 字符串嚴格匹配
    • 使用 hostnames 指令,可以對域名使用前綴 * 泛域名匹配
    • ~ 和 ~* 正則表達式匹配,後者忽略大小寫
  • default 規則
    • 沒有匹配到任何規則時,使用 default
    • 確實 default 時,返回空字符串給新變量
  • 其他
    • 使用 include 語法提升可讀性
    • 使用 volatile 禁止變量值緩存

大家看到上面這些規則可能都有些暈,廢話不多說,直接來看一個實戰配置文件就懂了。

實戰

這裏我們有一個配置文件,在這個文件裏面我們定義了兩個 map 塊,分別配置了兩個變量,$name 和 $mobile,$name 中包含 hostnames 指令。

map $http_host $name {
    hostnames;

    default       0;

    ~map\.ziyang\w+\.org.cn 1;
    *.ziyang.org.cn   2;
    map.ziyang.com   3;
    map.ziyang.*    4;
}

map $http_user_agent $mobile {
    default       0;
    "~Opera Mini" 1;
}

server {
	listen 10001;
	default_type text/plain;
	location /{
		return 200 '$name:$mobile\n';
	}
}

下面看一下實際的請求:

  test_nginx curl -H "Host: map.ziyang.org.cn" 127.0.0.1:10001
2:0

為什麼會返回 2:0 呢?我們來看一下匹配順序。

map.ziyang.org.cn 有三個規則可以生效,分別是:

  • ~map.ziyang\w+.org.cn 1;
  • *.ziyang.org.cn 2;
  • map.ziyang.* 4;

而泛域名是優先於正則表達式的,* 在前的泛域名優先於在後面的泛域名,因此最終匹配到的就是:

  • *.ziyang.org.cn 2;

而第二個變量 $mobile 自然走的是 default 規則,不用多說。

這就是 map 模塊的作用,大家可以多嘗試一下。

下面再來看一個與 map 模塊有點類似的 split_clients 模塊,這個模塊也是通過生成新的變量來完成 AB 測試功能的,它可以按照變量的值,按照百分比的方式,生成新的變量。

實現 AB 測試:split_clients 模塊

  • 功能:基於已有變量創建新變量,為其他 AB 測試提供更多的可能性
    • 對已有變量的值執行 MurmurHash2 算法,得到 32 位整形哈希数字,記為 hash
    • 32 位無符號整形的最大数字 2^32-1,記為 max
    • 哈希数字與最大数字相除,hash/max,可以得到百分比 percent
    • 配置指令中指示了各個百分比構成的範圍,如 0-1%,1%-5% 等,及範圍對應的值
    • 當 percent 落在哪個範圍里,新變量的值就對應着其後的參數
  • 模塊:ngx_http_split_clients_module,默認編譯進 Nginx,通過 --without-http_split_clients_module 禁用

規則

  • 已有變量
    • 字符串
    • 一個或者多個變量
    • 變量與字符串的組合
  • case 規則:
    • xx.xx%,支持小數點后 2 位,所有項的百分比相加不能超過 100%
    • *,由它匹配剩餘的百分比(100% 減去以上所有項相加的百分比)

指令

Syntax: split_clients string $variable { ... }
Default: —
Context: http

split_clients 的指令與 map 是非常相似的,可以看一下前面的介紹,這裏不再贅述了。

下面這個配置,來看下有沒有啥問題:

split_clients "${http_testcli}" $variant {
    0.51% .one;
    20.0% .two;
    50.5% .three;
    40% .four;
    * "";
}

細心的同學可能已經發現了,所有的百分比相加已經超過了 100%,所以 Nginx 直接會拋出一個錯誤,禁止執行。

  test_nginx ./sbin/nginx -s reload
nginx: [emerg] percent total is greater than 100% in /Users/mtdp/myproject/nginx/test_nginx/conf/example/17.map.conf:31

然後將 40% .four; 這一行給屏蔽掉再試試看:

  test_nginx curl -H "testcli: split_clients.ziyang.com" --resolve "split_clients.ziyang.com:80:127.0.0.1" http://split_clients.ziyang.com
ABtestfile.three

正常執行。

geo 模塊

geo 模塊與前面兩個模塊也很相似,不同之處在於,這個模塊是基於 IP 地址或者子網掩碼這樣的變量值來生成新的變量的。

  • 功能:根據 IP 地址創建新變量

  • 模塊: ngx_http_geo_module,默認編譯進 Nginx,通過 --without-http_geo_module 禁用

  • 指令

Syntax: geo [$address] $variable { ... }
Default: —
Context: http

規則

  • 如果 geo 指令后不輸入 $address,那麼默認使用 $remote_addr 變量作為 IP 地址

  • {} 內的指令匹配:優先最長匹配

    • 通過 IP 地址及子網掩碼的方式,定義 IP 範圍,當 IP 地址在範圍內時新變量使用其後的參數值

    • default 指定了當以上範圍都未匹配上時,新變量的默認值

    • 通過 proxy 指令指定可信地址(參考 realip 模塊),此時 remote_addr 的值為 X-Forwarded-For 頭部值中最後一個 IP 地址

    • proxy_recursive 允許循環地址搜索

    • include,優化可讀性

    • delete 刪除指定網絡

geo $country {
default ZZ;
#include conf/geo.conf;
#proxy 172.18.144.211;
127.0.0.0/24 US;
127.0.0.1/32 RU;
10.1.0.0/16 RU;
192.168.1.0/24 UK;
}


問題:以下命令執行時,變量 country 的值各為多少?(proxy 實際上為客戶端地址,這裏設置為本機的局域網地址即可,我這裡是 172.18.144.211)

curl -H ‘X-Forwarded-For: 10.1.0.0,127.0.0.2’ geo.ziyang.com
curl -H ‘X-Forwarded-For: 10.1.0.0,127.0.0.1’ geo.ziyang.com
curl -H ‘X-Forwarded-For: 10.1.0.0,127.0.0.1,1.2.3.4’ geo.ziyang.com


結果如下:

```shell
  test_nginx curl -H 'X-Forwarded-For: 10.1.0.0,127.0.0.2' geo.ziyang.com
US
  test_nginx curl -H 'X-Forwarded-For: 10.1.0.0,127.0.0.1' geo.ziyang.com
RU
  test_nginx curl -H 'X-Forwarded-For: 10.1.0.0,127.0.0.1,1.2.3.4' geo.ziyang.com
ZZ

這裏可以看出來,匹配規則實際上是遵循最長匹配的規則的。

geoip 模塊

geoip 模塊可以根據 IP 地址生成對應的地址變量,用法與前面的也都類似,Nginx 是基於 MaxMind 數據庫來生成對應的地址的。

  • 功能:根據 IP 地址創建新變量
  • 模塊: ngx_http_geoip_module,默認未編譯進 Nginx,通過 --with-http_geoip_module 禁用

使用這個模塊是需要安裝 MaxMind 庫的,安裝步驟如下:

  • 安裝 MaxMind 里 geoip 的 C 開發庫(https://dev.maxmind.com/geoip/legacy/downloadable/ )
  • 編譯 Nginx 時帶上 --with-http_geoip_module 參數
  • 下載 MaxMind 中的二進制地址庫,這個地址庫是需要在指令中指定對應的地址的
  • 使用 geoip_country 或者 geoip_city 指令配置好 nginx.conf
  • 運行或者升級 Nginx

geoip_country 指令提供的變量

指令

Syntax: geoip_country file; # 指定國家類的地址文件
Default: —
Context: http

Syntax: geoip_proxy address | CIDR;
Default: —
Context: http

變量

  • $geoip_country_code:兩個字母的國家代碼,比如 CN 或者 US
  • $geoip_country_code3:三個字母的國家代碼,比如 CHN 或者 USA
  • $geoip_country_name:國家名稱,例如 “China”, “United States”

geoip_city 指令提供的變量

指令

Syntax: geoip_city file;
Default: —
Context: http

變量

  • $geoip_latitude:緯度
  • $geoip_longitude:經度
  • $geoip_city_continent_code:位於全球哪個洲,例如 EU 或 AS
  • 與 $geoip_country 指令生成的變量重疊
    • $geoip_country_code:兩個字母的國家代碼,比如 CN 或者 US
    • $geoip_country_code3:三個字母的國家代碼,比如 CHN 或者 USA
    • $geoip_country_name:國家名稱,例如 “China”, “United States”
  • $geoip_region:洲或者省的編碼,例如 02
  • $geoip_region_name:洲或者省的名稱,例如 Zhejiang 或者 Saint Petersburg
  • $geoip_city:城市名
  • $geoip_postal_code:郵編號
  • $geoip_area_code:僅美國使用的郵編號,例如 408
  • $geoip_dma_code:僅美國使用的 DMA 編號,例如 807

keepalive 模塊

前面說的都是 Nginx 的變量相關的內容,其實 Nginx 還有一個很具有特色的模塊,那就是 keepalive 模塊,由於內容不是很多,所以我就直接寫到這篇文章裏面了,單寫一篇顯得內容不夠哈。

這裏指的是 HTTP 的 keepalive,TCP 也有 keepalive,後面會說。

而且是對客戶端的 keepalive,不是對上游服務器的。

  • 功能:多個 HTTP 請求通過復用 TCP 連接,可以實現以下功能:

    • 減少握手次數
    • 通過減少併發連接數減少了服務器資源消耗
    • 降低 TCP 擁塞控制的影響,保證滑動窗口維持在一個最優的大小
  • Connection 頭部

    • close:表示請求處理完就關閉連接
    • keepalive:表示復用連接處理下一條請求
  • Keepalive 頭部:timeout=n,單位是秒,表示連接至少保持 n 秒

指令

對客戶端行為控制的指令:

Syntax: keepalive_disable none | browser ...;
Default: keepalive_disable msie6; 
Context: http, server, location

Syntax: keepalive_requests number;
Default: keepalive_requests 100; 
Context: http, server, location

Syntax: keepalive_timeout timeout [header_timeout];
Default: keepalive_timeout 75s; 
Context: http, server, location
  • keepalive_disable 設置為 none 表示對所有瀏覽器啟用 keepalive,msie6 表示在老版本 MSIE 上禁用 keepalive
  • keepalive_requests 設置允許保持 keepalive 的請求的數量
  • keepalive_timeout 表示超時時間

好了,關於 Nginx 的模塊介紹就已經全部介紹完了,有興趣的同學可以去翻我前面的系列文章。當然還有一部分重要的內容還沒有介紹,那就是關於 Nginx 的反向代理和負載均衡部分,這塊咱們單獨抽出來說,別著急,馬上乾貨就出來。

本文首發於我的個人博客:iziyang.github.io,所有配置文件我已經放在了 Nginx 配置文件,大家可以自取。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

使用SpringCloud Stream結合rabbitMQ實現消息消費失敗重發機制

前言:實際項目中經常遇到消息消費失敗了,要進行消息的重發。比如支付消息消費失敗后,要分不同時間段進行N次的消息重發提醒。

本文模擬場景

  1. 當金額少於100時,消息消費成功
  2. 當金額大於100,小於200時,會進行3次重發,第一次1秒;第二次2秒;第三次3秒。
  3. 當金額大於200時,消息消費失敗,會進行5次重發,第一次1秒;第二次2秒;第三次3秒;第四次4秒;第五次5秒。重試五次后,消息自動進入死信隊列,在死信隊列存活60秒后消失。

代碼實例

特別注意代碼與配置文件中的註釋,各個使用說明都已經詳細寫在配置文件中

pom包引入

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.12.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.cloudstream</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
        <spring-cloud.version>Greenwich.SR5</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- ①關鍵配置:引入stream-rabbit 依賴-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <!-- ②關鍵配置:由於stream是基於spring-cloud的,所以這裏要引入 -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

    <repositories>
        <repository>
            <id>spring-snapshots</id>
            <name>Spring Snapshots</name>
            <url>https://repo.spring.io/snapshot</url>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>spring-milestones</id>
            <name>Spring Milestones</name>
            <url>https://repo.spring.io/milestone</url>
        </repository>
    </repositories>

</project>

配置application.yml文件

注意各個配置的縮進格式,別搞錯了

server:
  port: 8081
spring:
  application:
    name: stream-demo
  #rabbitmq連接配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: admin
    password: 123456
  cloud:
    stream:
      bindings:
        #消息生產者,與DelayDemoTopic接口中的DELAY_DEMO_PRODUCER變量值一致
        delay-demo-producer:
          #①定義交換機名
          destination: demo-delay-queue
        #消息消費者,與DelayDemoTopic接口中的DELAY_DEMO_CONSUMER變量值一致
        delay-demo-consumer:
          #定義交換機名,與①一致,就可以使發送和消費都指向一個隊列
          destination: demo-delay-queue
          #分組,這個配置可以開啟消息持久化、可以解決在集群環境下重複消費的問題。
          #比如A、B兩台服務器集群,如果沒有這個配置,則A、B都能收到同樣的消息,如果有該配置則只有其中一台會收到消息
          group: delay-consumer-group
          consumer:
            #最大重試次數,默認為3。不使用默認的,這裏定義為1,由我們程序控制發送時間和次數
            maxAttempts: 1
      rabbit:
        bindings:
          #消息生產者,與DelayDemoTopic接口中的DELAY_DEMO_PRODUCER變量值一致
          delay-demo-producer:
            producer:
              #②申明為延遲隊列
              delayedExchange: true
          #消息消費者,與DelayDemoTopic接口中的DELAY_DEMO_CONSUMER變量值一致
          delay-demo-consumer:
            consumer:
              #申明為延遲隊列,與②的配置的成對出現的
              delayedExchange: true
              #開啟死信隊列
              autoBindDlq: true
              #死信隊列中消息的存活時間
              dlqTtl: 60000

定義隊列通道

  1. 定義通道
/**
 * 定義延遲消息通道
 */
public interface DelayDemoTopic {
    /**
     * 生產者,與yml文件配置對應
     */
    String DELAY_DEMO_PRODUCER = "delay-demo-producer";
    /**
     * 消費者,與yml文件配置對應
     */
    String DELAY_DEMO_CONSUMER = "delay-demo-consumer";

    /**
     * 定義消息消費者,在@StreamListener監聽消息的時候用到
     * @return
     */
    @Input(DELAY_DEMO_CONSUMER)
    SubscribableChannel delayDemoConsumer();

    /**
     * 定義消息發送者,在發送消息的時候用到
     * @return
     */
    @Output(DELAY_DEMO_PRODUCER)
    MessageChannel delayDemoProducer();
}
  1. 綁定通道
/**
 * 配置消息的binding
 *
 */
@EnableBinding(value = {DelayDemoTopic.class})
@Component
public class MessageConfig {

}

消息發送模擬

/**
 * 發送消息
 */
@RestController
public class SendMessageController {
    @Autowired
    DelayDemoTopic delayDemoTopic;

    @GetMapping("send")
    public Boolean sendMessage(BigDecimal money) throws JsonProcessingException {

        Message<BigDecimal> message = MessageBuilder.withPayload(money)
                //設置消息的延遲時間,首次發送,不設置延遲時間,直接發送
                .setHeader(DelayConstant.X_DELAY_HEADER,0)
                //設置消息已經重試的次數,首次發送,設置為0
                .setHeader(DelayConstant.X_RETRIES_HEADER,0)
                .build();
        return delayDemoTopic.delayDemoProducer().send(message);
    }
}

消息監聽處理

@Component
@Slf4j
public class DelayDemoTopicListener {
    @Autowired
    DelayDemoTopic delayDemoTopic;

    /**
     * 監聽延遲消息通道中的消息
     * @param message
     */
    @StreamListener(value = DelayDemoTopic.DELAY_DEMO_CONSUMER)
    public void listener(Message<BigDecimal> message) {
        //獲取重試次數
        int retries = (int)message.getHeaders().get(DelayConstant.X_RETRIES_HEADER);
        //獲取消息內容
        BigDecimal money = message.getPayload();
        try {
            String now = DateUtils.formatDate(new Date(),"yyyy-MM-dd HH:mm:ss");
            //模擬:如果金額大於200,則消息無法消費成功;金額如果大於100,則重試3次;如果金額小於100,直接消費成功
            if (money.compareTo(new BigDecimal(200)) == 1){
                throw new RuntimeException(now+":金額超出200,無法交易。");
            }else if (money.compareTo(new BigDecimal(100)) == 1 && retries <= 3) {
                if (retries == 0) {
                    throw new RuntimeException(now+":金額超出100,消費失敗,將進入重試。");
                }else {
                    throw new RuntimeException(now+":金額超出100,當前第" + retries + "次重試。");
                }
            }else {
                log.info("消息消費成功!");
            }
        }catch (Exception e) {
            log.error(e.getMessage());
            if (retries < DelayConstant.X_RETRIES_TOTAL){
                //將消息重新塞入隊列
                MessageBuilder<BigDecimal> messageBuilder = MessageBuilder.fromMessage(message)
                        //設置消息的延遲時間
                        .setHeader(DelayConstant.X_DELAY_HEADER,DelayConstant.ruleMap.get(retries + 1))
                        //設置消息已經重試的次數
                        .setHeader(DelayConstant.X_RETRIES_HEADER,retries + 1);
                Message<BigDecimal> reMessage = messageBuilder.build();
                //將消息重新發送到延遲隊列中
                delayDemoTopic.delayDemoProducer().send(reMessage);
            }else {
                //超過重試次數,做相關處理(比如保存數據庫等操作),如果拋出異常,則會自動進入死信隊列
                throw new RuntimeException("超過最大重試次數:" + DelayConstant.X_RETRIES_TOTAL);
            }
        }
    }
}

規則定義

目前寫在一個常量類里,實際項目中,通常會配置在配置文件中

public class DelayConstant {
    /**
     * 定義當前重試次數
     */
    public static final String X_RETRIES_HEADER = "x-retries";
    /**
     * 定義延遲消息,固定值,該配置放到消息的header中,會開啟延遲隊列
     */
    public static final String X_DELAY_HEADER = "x-delay";

    /**
     * 定義最多重試次數
     */
    public static final Integer X_RETRIES_TOTAL = 5;

    /**
     * 定義重試規則,毫秒為單位
     */
    public static final Map<Integer,Integer> ruleMap = new HashMap(){{
        put(1,1000);
        put(2,2000);
        put(3,3000);
        put(4,4000);
        put(5,5000);
    }};
}

測試

經過以上配置和實現就可完成模擬的重發場景。

  • 瀏覽器中輸入http://127.0.0.1:8081/send?money=10,可以看到控制台中輸出:
消息消費成功!
  • 瀏覽器中輸入http://127.0.0.1:8081/send?money=110,可以看到控制台中輸出:
2020-06-20 10:59:42:金額超出100,消費失敗,將進入重試。
2020-06-20 10:59:43:金額超出100,當前第1次重試。
2020-06-20 10:59:45:金額超出100,當前第2次重試。
2020-06-20 10:59:48:金額超出100,當前第3次重試。
消息消費成功!

  • 瀏覽器中輸入http://127.0.0.1:8081/send?money=110,可以看到控制台中輸出:

注意事項

由於本文用到了延遲隊列,需要在rabbitMQ中安裝延遲插件,具體安裝方式,可以查看:延遲隊列安裝參考

源碼獲取

以上示例都可以通過我的GitHub獲取完整的代碼.

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

springboot + rabbitmq 做智能家居,我也沒想到會這麼簡單

本文收錄在個人博客:www.chengxy-nds.top,共享技術資源,共同進步

前一段有幸參与到一個智能家居項目的開發,由於之前都沒有過這方面的開發經驗,所以對智能硬件的開發模式和技術棧都頗為好奇。

產品是一款可燃氣體報警器,如果家中燃氣泄露濃度到達一定閾值,報警器檢測到並上傳氣體濃度值給後台,後台以電話、短信、微信等方式,提醒用戶家中可能有氣體泄漏。

用戶還可能向報警器發一些關閉報警、調整音量的指令等。整體功能還是比較簡單的,大致的邏輯如下圖所示:

但當我真正的參与其中開發時,其實有一點小小的失望,因為在整個研發過程中,並沒用到什麼新的技術,還是常規的幾種中間件,只不過換個用法而已。

技術選型用rabbitmq 來做核心的組件,主要考慮到運維成本低,組內成員使用的熟練度比較高。

下面和小夥伴分享一下如何用 springboot + rabbitmq 搭建物聯網(IOT)平台,其實智能硬件也沒想象的那麼高不可攀!

很多小夥伴可能有點懵?rabbitmq 不是消息隊列嗎?怎麼又能做智能硬件了

其實rabbitmq有兩種協議,我們平時接觸的消息隊列是用的AMQP協議,而用在智能硬件中的是MQTT協議。

一、什麼是 MQTT協議?

MQTT 全稱(Message Queue Telemetry Transport):一種基於發布/訂閱(publish/subscribe)模式的輕量級通訊協議,通過訂閱相應的主題來獲取消息,是物聯網(Internet of Thing)中的一個標準傳輸協議。

該協議將消息的發布者(publisher)與訂閱者(subscriber)進行分離,因此可以在不可靠的網絡環境中,為遠程連接的設備提供可靠的消息服務,使用方式與傳統的MQ有點類似。

TCP協議位於傳輸層,MQTT 協議位於應用層,MQTT 協議構建於TCP/IP協議上,也就是說只要支持TCP/IP協議棧的地方,都可以使用MQTT協議。

二、為什麼要用 MQTT協議?

MQTT協議為什麼在物聯網(IOT)中如此受偏愛?而不是其它協議,比如我們更為熟悉的 HTTP協議呢?

  • 首先HTTP協議它是一種同步協議,客戶端請求后需要等待服務器的響應。而在物聯網(IOT)環境中,設備會很受制於環境的影響,比如帶寬低、網絡延遲高、網絡通信不穩定等,顯然異步消息協議更為適合IOT應用程序。

  • HTTP是單向的,如果要獲取消息客戶端必須發起連接,而在物聯網(IOT)應用程序中,設備或傳感器往往都是客戶端,這意味着它們無法被動地接收來自網絡的命令。

  • 通常需要將一條命令或者消息,發送到網絡上的所有設備上。HTTP要實現這樣的功能不但很困難,而且成本極高。

三、MQTT協議介紹

前邊說過MQTT是一種輕量級的協議,它只專註於發消息, 所以此協議的結構也非常簡單。

MQTT數據包

MQTT協議中,一個MQTT數據包由:固定頭(Fixed header)、 可變頭(Variable header)、 消息體(payload)三部分構成。

  • 固定頭(Fixed header),所有數據包中都有固定頭,包含數據包類型及數據包的分組標識。
  • 可變頭(Variable header),部分數據包類型中有可變頭。
  • 內容消息體(Payload),存在於部分數據包類,是客戶端收到的具體消息內容。

1、固定頭

固定頭部,使用兩個字節,共16位:

(4-7)位表示消息類型,使用4位二進製表示,可代表如下的16種消息類型,不過 0 和 15位置屬於保留待用,所以共14種消息事件類型。

DUP Flag(重試標識)

DUP Flag:保證消息可靠傳輸,消息是否已送達的標識。默認為0,只佔用一個字節,表示第一次發送,當值為1時,表示當前消息先前已經被傳送過。

QoS Level(消息質量等級)

QoS Level:消息的質量等級,後邊會詳細介紹

RETAIN(持久化)

  • 值為1:表示發送的消息需要一直持久保存,而且不受服務器重啟影響,不但要發送給當前的訂閱者,且以後新加入的客戶端訂閱了此Topic,訂閱者也會馬上得到推送。
    注意:新加入的訂閱者,只會取出最新的一個RETAIN flag = 1的消息推送。

  • 值為0:僅為當前訂閱者推送此消息。

Remaining Length(剩餘長度)

在當前消息中剩餘的byte(字節)數,包含可變頭部和消息體payload。

2、可變頭

固定頭部僅定義了消息類型和一些標誌位,一些消息的元數據需要放入可變頭部中。可變頭部內容字節長度 + 消息體payload = 剩餘長度。

可變頭部居於固定頭部和payload中間,包含了協議名稱,版本號,連接標誌,用戶授權,心跳時間等內容。

可變頭存在於這些類型的消息:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。

3、消息體payload

消息體payload只存在於CONNECTPUBLISHSUBSCRIBESUBACKUNSUBSCRIBE這幾種類型的消息:

  • CONNECT:包含客戶端的ClientId、訂閱的TopicMessage以及用戶名密碼
  • PUBLISH:向對應主題發送消息。
  • SUBSCRIBE:要訂閱的主題以及QoS
  • SUBACK:服務器對於SUBSCRIBE所申請的主題及QoS進行確認和回復。
  • UNSUBSCRIBE:取消要訂閱的主題。

消息質量(QoS )

消息質量(Quality of Service),即消息的發送質量,發布者(publisher)和訂閱者(subscriber)都可以指定qos等級,有QoS 0QoS 1QoS 2三個等級。

下邊分別說明一下這三個等級的區別。

1、Qos 0At most once(至多一次),只發送一次消息,不保證消息是否成功送達,沒有確認機制,消息可能會丟失或重複。

2、Qos 1At least once(至少一次),相對於QoS 0而言Qos 1增加了ack確認機制,發送者(publisher)推送消息到MQTT代理(broker)時,兩者自身都會先持久化消息,只有當publisher 或者 Broker分別收到 PUBACK確認時,才會刪除自身持久化的消息,否則就會重發。

但有個問題,儘管我們可以通過確認來保證一定收到客戶端 或 服務器的message,可我們卻不能保證僅收到一次message,也就是當客戶端publisher沒收到Brokerpuback或者 Broker沒有收到subscriberpuback,那麼就會一直重發。

publisher -> broker 大致流程:

  1. publisher store msg -> publish ->broker (傳遞message)
  2. broker -> puback -> publisher delete msg (確認傳遞成功)

3、Qos 2Exactly once(只有一次),相對於QoS 1QoS 2升級實現了僅接受一次messagepublisherbroker 同樣對消息進行持久化,其中 publisher 緩存了message和 對應的msgID,而 broker 緩存了 msgID,可以保證消息不重複,由於又增加了一個confirm 機制,整個流程變得複雜很多。

publisher -> broker 大致流程:

  1. publisher store msg -> publish ->broker -> broker store
  2. msgID(傳遞message) broker -> puberc (確認傳遞成功)
  3. publisher -> pubrel ->broker delete msgID (告訴broker刪除msgID)
  4. broker -> pubcomp -> publisher delete msg (告訴publisher刪除msg)

LWT(最後遺囑)

LWT 全稱為 Last Will and Testament,其實遺囑是一個由客戶端預先定義好的主題和對應消息,附加在CONNECT的數據包中,包括遺願主題遺願 QoS遺願消息等。

當MQTT代理 Broker 檢測到有客戶端client非正常斷開連接時,再由服務器主動發布此消息,然後相關的訂閱者會收到消息。

舉個栗子:聊天室中所有人都訂閱一個叫talk的主題 ,但小富由於網絡抖動突然斷開了鏈接,這時聊天室中所有訂閱主題 talk的客戶端都會收到一個 “小富離開聊天室” 的遺願消息。

遺囑的相關參數:

  • Will Flag:是否使用 LWT,1 開啟
  • Will Topic:遺願主題名,不可使用通配符
  • Will Qos:發布遺願消息時使用的 QoS
  • Will Retain:遺願消息的 Retain 標識
  • Will Message:遺願消息內容

那客戶端Client 有哪些場景是非正常斷開連接呢?

  • Broker 檢測到底層的 I/O 異常;
  • 客戶端 未能在心跳 Keep Alive 的間隔內和 Broker 進行消息交互;
  • 客戶端 在關閉底層 TCP 連接前沒有發送 DISCONNECT 數據包;
  • 客戶端 發送錯誤格式的數據包到 Broker,導致關閉和客戶端的連接等。

注意:當客戶端通過發布 DISCONNECT 數據包斷開連接時,屬於正常斷開連接,並不會觸發 LWT 的機制,與此同時Broker 還會丟棄掉當前客戶端在連接時指定的相關 LWT 參數。

四、MQTT協議應用場景

MQTT協議廣泛應用於物聯網、移動互聯網、智能硬件、車聯網、電力能源等領域。使用的場景也是非常非常多,下邊列舉一些:

  • 物聯網M2M通信,物聯網大數據採集
  • Android消息推送,WEB消息推送
  • 移動即時消息,例如Facebook Messenger
  • 智能硬件、智能傢具、智能電器
  • 車聯網通信,電動車站樁採集
  • 智慧城市、遠程醫療、遠程教育
  • 電力、石油與能源等行業市場

五、代碼實現

具體 rabbitmq 的環境搭建就不贅述了,網上教程比較多,有條件的用服務器,沒條件的像我搞個Windows版的也很快樂嘛。

1、啟用 rabbitmq的mqtt協議

我們先開啟 rabbitmqmqtt協議,因為默認安裝下是關閉的,命令如下:

rabbitmq-plugins enable rabbitmq_mqtt

2、mqtt 客戶端依賴包

上一步中安裝rabbitmq環境並開啟 mqtt協議后,實際上mqtt 消息代理服務就搭建好了,接下來要做的就是實現客戶端消息的推送和訂閱。

這裏使用spring-integration-mqttorg.eclipse.paho.client.mqttv3兩個工具包實現。

<!--mqtt依賴包-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
       <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.0</version>
</dependency>

3、消息發送者

消息的發送比較簡單,主要是應用到@ServiceActivator註解,需要注意messageHandler.setAsync屬性,如果設置成false,關閉異步模式發送消息時可能會阻塞。

@Configuration
public class IotMqttProducerConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttConfig.getServers());
        return factory;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }

    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler(mqttConfig.getServerClientId(), mqttClientFactory());
        messageHandler.setAsync(false);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }
}

MQTT 對外提供發送消息的API時,需要使用@MessagingGateway 註解,去提供一個消息網關代理,參數defaultRequestChannel 指定發送消息綁定的channel

可以實現三種API接口,payload 為發送的消息,topic 發送消息的主題,qos 消息質量。

@MessagingGateway(defaultRequestChannel = "iotMqttInputChannel")
public interface IotMqttGateway {

    // 向默認的 topic 發送消息
    void sendMessage2Mqtt(String payload);
    // 向指定的 topic 發送消息
    void sendMessage2Mqtt(String payload,@Header(MqttHeaders.TOPIC) String topic);
    // 向指定的 topic 發送消息,並指定服務質量參數
    void sendMessage2Mqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

4、消息訂閱

消息訂閱和我們平時用的MQ消息監聽實現思路基本相似,@ServiceActivator註解表明當前方法用於處理MQTT消息,inputChannel 參數指定了用於接收消息的channel

/**
 * @Author: xiaofu
 * @Description: 消息訂閱配置
 * @date 2020/6/8 18:24
 */
@Configuration
public class IotMqttSubscriberConfig {

    @Autowired
    private MqttConfig mqttConfig;

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setServerURIs(mqttConfig.getServers());
        return factory;
    }

    @Bean
    public MessageChannel iotMqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(iotMqttInputChannel());
        return adapter;
    }

    /**
     * @author xiaofu
     * @description 消息訂閱
     * @date 2020/6/8 18:20
     */
    @Bean
    @ServiceActivator(inputChannel = "iotMqttInputChannel")
    public MessageHandler handlerTest() {

        return message -> {
            try {
                String string = message.getPayload().toString();
                System.out.println("接收到消息:" + string);
            } catch (MessagingException ex) {
                //logger.info(ex.getMessage());
            }
        };
    }
}

六、測試消息

額~ 由於本渣渣對硬件一竅不通,為了模擬硬件的發送消息,只能藉助一下工具,其實硬件端實現MQTT協議,跟我們前邊的基本沒什麼區別,只不過換種語言嵌入到硬件中而已。

這裏選的測試工具為mqttbox,下載地址:http://workswithweb.com/mqttbox.html

1、測試消息發送

我們用先用mqttbox模擬向主題mqtt_test_topic發送消息,看後台是否能成功接收到。

看到後台成功拿到了向主題mqtt_test_topic發送的消息。

2、測試消息訂閱

mqttbox模擬訂閱主題mqtt_test_topic,在後台向主題mqtt_test_topic發送一條消息,這裏我簡單的寫了個controller調用API發送消息。

http://127.0.0.1:8080/fun/testMqtt?topic=mqtt_test_topic&message=我是後台向主題 mqtt_test_topic 發送的消息

我們看mqttbox的訂閱消息,已經成功的接收到了後台的消息,到此我們的MQTT通信環境就算搭建成功了。如果把mqttbox工具換成具體硬件設備,整個流程就是我們常說的智能家居了,其實真的沒那麼難。

七、應用注意事項

在我們實際的生產環境中遇到過的問題,這裏分享一下讓大家少踩坑。

clientId 要唯一

在客戶端connect連接的時,會有一個clientId 參數,需要每個客戶端都保持唯一的。但我們在開發測試階段clientId直接在代碼中寫死了,而且服務都是單實例部署,並沒有暴露出什麼問題。

MqttPahoMessageDrivenChannelAdapter(mqttConfig.getClientId(), mqttClientFactory(), mqttConfig.getDefaultTopic());

然而在生產環境內側的時候,由於服務是多實例集群部署,結果出現了下邊的奇怪問題。同一時間內只能有一個客戶端能拿到消息,其他客戶端不但不能消費消息,而且還在不斷的掉線重連:Lost connection: 已斷開連接; retrying...

這就是由於clientId相同導致客戶端間相互競爭消費,最後將clientId獲取方式換成從發號器中拿,問題就好了,所以這個地方是需要特別注意的。

平時程序在開發環境沒問題,可偏偏到了生產環境就一大堆問題,很多都是因為服務部署方式不同導致的。所以多學習分佈式還是很有必要的。

八、其他中間件

MQTT它只是一種協議,支持MQTT協議的消息中間件產品非常多,下邊的也只是其中的一部分

  • Mosquitto
  • Eclipse Paho
  • RabbitMQ
  • Apache ActiveMQ
  • HiveMQ
  • JoramMQ
  • ThingMQ
  • VerneMQ
  • Apache Apollo
  • emqttd Xively
  • IBM Websphere
    …..

總結

我也是第一次做和硬件相關的項目,之前聽到智能家居都會覺得好高大上,但實際上手開發后發現,技術嘛萬變不離其宗,也只是換種用法而已。

雙手奉上項目 demo 的github地址 :https://github.com/chengxy-nds/springboot-rabbitmq-mqtt.git

感興趣的小夥伴可以下載跑一跑,實現起來非常的簡單。

原創不易,燃燒秀髮輸出內容,希望你能有一丟丟收穫!

整理了幾百本各類技術电子書,送給小夥伴們,關注公號回復【666】自行領取。和一些小夥伴們建了一個技術交流群,一起探討技術、分享技術資料,旨在共同學習進步,如果感興趣就加入我們吧!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

matplotlib 強化學習

matplotlib 強化學習

import matplotlib.pyplot as plt
...![](https://img2020.cnblogs.com/blog/1642028/202006/1642028-20200621111043462-144482637.png)


plt.show()		#显示圖像;下面都要寫,就不重複了

二維圖表

1. 基本圖表

  1. 用plot方法畫出x=(0,10)間sin的圖像
x = np.linspace(0, 10, 30)  #產生0-10之間的30個均勻數組
plt.plot(x, np.sin(x));		#以x為橫坐標,sin(x)為縱坐標打印出圖像

注:

  • linspace生成的是包含結尾的數組,比如0-10生成11個數才是0,1,2,3,4,5…
  • 生成10個數則是0,1.11111111, 2.22222222, 3.33333333, 4.44444444…;
  • 而arrange是不包含結尾的,0-10生成10個數是0,1,2,3…
  1. 用點,線的方式畫出x=(0,10)間sin的圖像
plt.plot(x, np.sin(x), '-o');
#'o’代表每個數據點用小圓圈表示,且數據點之前不用線連接,看起來很像散點圖
#'ro'代表小圓圈是紅色的
#'-'就是最普通的線型,數據點之間用實線連接。
#'--'設置線性為虛線

!

  1. 用scatter方法畫出x=(0,10)間sin的點圖像
plt.scatter(x, np.sin(x));		#散點圖

  1. 用餅圖的面積及顏色展示一組4維數據
rng = np.random.RandomState(0)
x = rng.randn(100)			#生成隨機數組
y = rng.randn(100)
colors = rng.rand(100)
sizes = 1000 * rng.rand(100)

plt.scatter(x, y, c=colors, s=sizes, alpha=0.3,
cmap='viridis')
plt.colorbar(); 			# 展示色階

繪製柱狀圖

x = [1,2,3,4,5,6,7,8]
y = [3,1,4,5,8,9,7,2]
label=['A','B','C','D','E','F','G','H']

plt.bar(x,y,tick_label = label);	#縱向升高
plt.barh(x,y,tick_label = label);	#換成橫向

直方圖

data = np.random.randn(1000) #生成1000個隨機數
plt.hist(data);				#畫出圖像

!

2. 自定義圖表元素

x = np.linspace(0,10,100)
plt.plot(x, np.sin(x))
plt.ylim(-1.5, 1.5);		#設置y軸显示範圍為(-1.5,1.5)
x = np.linspace(0.05, 10, 100)
y = np.sin(x)
plt.plot(x, y, label='sin(x)')
plt.xlabel('variable x');			#設置x,y軸標籤variable x,value y
plt.ylabel('value y');
plt.title('三角函數');					#設置圖表標題“三角函數”
plt.text(3.2, 0, 'sin(x)', weight='bold', color='r');	#註釋

plt.annotate('maximum',xy=(np.pi/2, 1),xytext=(np.pi/2+1, 1),weight='bold',color='r',arrowprops=dict(arrowstyle='->', connectionstyle='arc3', color='r'));					#箭頭標識

显示網格

x = np.linspace(0.05, 10, 100)
y = np.sin(x)
plt.plot(x, y)
plt.grid()

...
參數
matplotlin.pyplot.grid(b, which, axis, color, linestyle, linewidth, **kwargs) axis : 取值為‘both’, ‘x’,‘y’。就是想繪製哪個方向的網格線。不過我在輸入參數的時候發現如果輸入x或y的時候,             輸入的是哪條軸,則會隱藏哪條軸

color : 這就不用多說了,就是設置網格線的顏色。或者直接用c來代替color也可以。
plt.grid(c='g') 設置顏色為綠色

linestyle :也可以用ls來代替linestyle, 設置網格線的風格,是連續實線,虛線或者其它不同的線條。 | '-' | '--' | '-.' | ':' | 'None' | ' ' | '']
plt.grid(linestyle='-.')

linewidth : 設置網格線的寬度
...

繪製平行於x軸y=0.8的水平參考線

x = np.linspace(0.05, 10, 100)
y = np.sin(x)
plt.plot(x, y)
plt.axhline(y=0.8, ls='--', c='r')#水平參考線

3. 自定義圖像

在一張圖裡繪製sin,cos的圖形,並展示圖例

x = np.linspace(0, 10, 1000)
fig, ax = plt.subplots()

ax.plot(x, np.sin(x), label='sin')
ax.plot(x, np.cos(x), '--', label='cos')
ax.legend();

多子圖

在2個子圖中,显示sin(x)和cos(x)的圖像

fig = plt.figure()
ax1 = fig.add_axes([0.1, 0.5, 0.8, 0.4], ylim=(-1.2, 1.2))
ax2 = fig.add_axes([0.1, 0.1, 0.8, 0.4], ylim=(-1.2, 1.2))

x = np.linspace(0, 10)
ax1.plot(np.sin(x));
ax2.plot(np.cos(x));

for i in range(1, 7):		#用for創建6個子圖,並且在圖中標識出對應的子圖坐標
plt.subplot(2, 3, i)
plt.text(0.5, 0.5, str((2, 3, i)),fontsize=18, ha='center')

組合繪製大小不同的子圖

grid = plt.GridSpec(2, 3, wspace=0.4, hspace=0.3)
plt.subplot(grid[0, 0])
plt.subplot(grid[0, 1:])
plt.subplot(grid[1, :2])
plt.subplot(grid[1, 2]);

三維圖像

#38.創建一個三維畫布
from mpl_toolkits import mplot3d
fig = plt.figure()
ax = plt.axes(projection='3d')

#39.繪製一個三維螺旋線
ax = plt.axes(projection='3d')
# Data for a three-dimensional line
zline = np.linspace(0, 15, 1000)
xline = np.sin(zline)
yline = np.cos(zline)
ax.plot3D(xline, yline, zline);

#40.繪製一組三維點
ax = plt.axes(projection='3d')
zdata = 15 * np.random.random(100)
xdata = np.sin(zdata) + 0.1 * np.random.randn(100)
ydata = np.cos(zdata) + 0.1 * np.random.randn(100)
ax.scatter3D(xdata, ydata, zdata, c=zdata, cmap='Greens');

import numpy  as np
from matplotlib import pyplot  as plt
from mpl_toolkits.mplot3d import Axes3D
q1 = np.arange(0.01, 1, 0.01)
q2 = np.arange(0.01, 1 , 0.01)  #生成一位基底
q1, q2 = np.meshgrid(q1, q2)    #混合成二維數組,形成二維基底

pCDa = (1-q1)
pCDb = (np.sqrt((1-q1)**2+q1**2)-q1)
s_pCD = -q1* np.log2(q1) - (1-q1) * np.log2(1-q1)
Q_MID1 = s_pCD *q2 /q2        #AB或CD的關聯值,下圖是(s_x_pCD - s_pCD) *q2;  *q2/q2后才是圓柱體

fig = plt.figure()
ax = Axes3D(fig)
ax.plot_surface(q1,q2,Q_MID1)     #表面圖
ax.set_xlabel('value of q2')
ax.set_ylabel('value of q1')
ax.set_zlabel('the value of Q_MID1(pCD)')
plt.show()

#參數
ax.plot_surface(X, Y, Z, *args, **kwargs)
X,Y,Z:數據
rstride、cstride、rcount、ccount:同Wireframe plots定義
color:表面顏色
cmap:圖層

參考文獻:

  1. https://www.kesci.com/home/project/5de9f0a0953ca8002c95d2a9 50題matplotlib從入門到精通

  2. https://www.cnblogs.com/knightoffz/p/12933716.html 大創項目經歷

  3. https://matplotlib.org/mpl_toolkits/mplot3d/tutorial.html 官方文檔

  4. https://www.cnblogs.com/xingshansi/p/6777945.html 參考博客

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

11.DRF-權限

Django rest framework源碼分析(2)—-權限

添加權限

(1)API/utils文件夾下新建premission.py文件,代碼如下:

  • message是當沒有權限時,提示的信息
# utils/permission.py

class SVIPPremission(object):
    message = "必須是SVIP才能訪問"
    def has_permission(self,request,view):
        if request.user.user_type != 3:
            return False
        return True


class MyPremission(object):
    def has_permission(self,request,view):
        if request.user.user_type == 3:
            return False
        return True

(2)settings.py全局配置權限

#全局
REST_FRAMEWORK = {
    "DEFAULT_AUTHENTICATION_CLASSES":['API.utils.auth.Authentication',],
    "DEFAULT_PERMISSION_CLASSES":['API.utils.permission.SVIPPremission'],
}

(3)views.py添加權限

  • 默認所有的業務都需要SVIP權限才能訪問
  • OrderView類裏面沒寫表示使用全局配置的SVIPPremission
  • UserInfoView類,因為是普通用戶和VIP用戶可以訪問,不使用全局的,要想局部使用的話,裏面就寫上自己的權限類
  • permission_classes = [MyPremission,] #局部使用權限方法
from django.shortcuts import render,HttpResponse
from django.http import JsonResponse
from rest_framework.views import APIView
from API import models
from rest_framework.request import Request
from rest_framework import exceptions
from rest_framework.authentication import BaseAuthentication
from API.utils.permission import SVIPPremission,MyPremission

ORDER_DICT = {
    1:{
        'name':'apple',
        'price':15
    },
    2:{
        'name':'dog',
        'price':100
    }
}

def md5(user):
    import hashlib
    import time
    #當前時間,相當於生成一個隨機的字符串
    ctime = str(time.time())
    m = hashlib.md5(bytes(user,encoding='utf-8'))
    m.update(bytes(ctime,encoding='utf-8'))
    return m.hexdigest()

class AuthView(APIView):
    '''用於用戶登錄驗證'''

    authentication_classes = []      #裏面為空,代表不需要認證
    permission_classes = []          #不裏面為空,代表不需要權限
    def post(self,request,*args,**kwargs):
        ret = {'code':1000,'msg':None}
        try:
            user = request._request.POST.get('username')
            pwd = request._request.POST.get('password')
            obj = models.UserInfo.objects.filter(username=user,password=pwd).first()
            if not obj:
                ret['code'] = 1001
                ret['msg'] = '用戶名或密碼錯誤'
            #為用戶創建token
            token = md5(user)
            #存在就更新,不存在就創建
            models.UserToken.objects.update_or_create(user=obj,defaults={'token':token})
            ret['token'] = token
        except Exception as e:
            ret['code'] = 1002
            ret['msg'] = '請求異常'
        return JsonResponse(ret)


class OrderView(APIView):
    '''
    訂單相關業務(只有SVIP用戶才能看)
    '''

    def get(self,request,*args,**kwargs):
        self.dispatch
        #request.user
        #request.auth
        ret = {'code':1000,'msg':None,'data':None}
        try:
            ret['data'] = ORDER_DICT
        except Exception as e:
            pass
        return JsonResponse(ret)


class UserInfoView(APIView):
    '''
       訂單相關業務(普通用戶和VIP用戶可以看)
       '''
    permission_classes = [MyPremission,]    #不用全局的權限配置的話,這裏就要寫自己的局部權限
    def get(self,request,*args,**kwargs):

        print(request.user)
        return HttpResponse('用戶信息')
# urls.py
from django.contrib import admin
from django.urls import path
from API.views import AuthView,OrderView,UserInfoView

urlpatterns = [
    path('admin/', admin.site.urls),
    path('api/v1/auth/',AuthView.as_view()),
    path('api/v1/order/',OrderView.as_view()),
    path('api/v1/info/',UserInfoView.as_view()),
]
# API/utils/auth/py
# auth.py

from rest_framework import exceptions
from API import models
from rest_framework.authentication import BaseAuthentication


class Authentication(BaseAuthentication):
    '''用於用戶登錄驗證'''
    def authenticate(self,request):
        token = request._request.GET.get('token')
        token_obj = models.UserToken.objects.filter(token=token).first()
        if not token_obj:
            raise exceptions.AuthenticationFailed('用戶認證失敗')
        #在rest framework內部會將這兩個字段賦值給request,以供後續操作使用
        return (token_obj.user,token_obj)

    def authenticate_header(self, request):
        pass

(4)測試

普通用戶訪問OrderView,提示沒有權限

普通用戶訪問UserInfoView,可以返回信息

權限源碼流程

(1)dispatch

def dispatch(self, request, *args, **kwargs):
    """
    `.dispatch()` is pretty much the same as Django's regular dispatch,
    but with extra hooks for startup, finalize, and exception handling.
    """
    self.args = args
    self.kwargs = kwargs
    #對原始request進行加工,豐富了一些功能
    #Request(
    #     request,
    #     parsers=self.get_parsers(),
    #     authenticators=self.get_authenticators(),
    #     negotiator=self.get_content_negotiator(),
    #     parser_context=parser_context
    # )
    #request(原始request,[BasicAuthentications對象,])
    #獲取原生request,request._request
    #獲取認證類的對象,request.authticators
    #1.封裝request
    request = self.initialize_request(request, *args, **kwargs)
    self.request = request
    self.headers = self.default_response_headers  # deprecate?

    try:
        #2.認證
        self.initial(request, *args, **kwargs)

        # Get the appropriate handler method
        if request.method.lower() in self.http_method_names:
            handler = getattr(self, request.method.lower(),
                                  self.http_method_not_allowed)
        else:
            handler = self.http_method_not_allowed

        response = handler(request, *args, **kwargs)

    except Exception as exc:
        response = self.handle_exception(exc)

    self.response = self.finalize_response(request, response, *args, **kwargs)
    return self.response

(2)initial

def initial(self, request, *args, **kwargs):
    """
    Runs anything that needs to occur prior to calling the method handler.
    """
    self.format_kwarg = self.get_format_suffix(**kwargs)

    # Perform content negotiation and store the accepted info on the request
    neg = self.perform_content_negotiation(request)
    request.accepted_renderer, request.accepted_media_type = neg

    # Determine the API version, if versioning is in use.
    version, scheme = self.determine_version(request, *args, **kwargs)
    request.version, request.versioning_scheme = version, scheme

    # Ensure that the incoming request is permitted
    #4.實現認證
    self.perform_authentication(request)
    #5.權限判斷
    self.check_permissions(request)
    self.check_throttles(request)

(3)check_permissions

裏面有個has_permission這個就是我們自己寫的權限判斷

def check_permissions(self, request):
    """
    Check if the request should be permitted.
    Raises an appropriate exception if the request is not permitted.
    """
    #[權限類的對象列表]
    for permission in self.get_permissions():
        if not permission.has_permission(request, self):
            self.permission_denied(
                request, message=getattr(permission, 'message', None)
            )

(4)get_permissions

def get_permissions(self):
    """
    Instantiates and returns the list of permissions that this view requires.
    """
    return [permission() for permission in self.permission_classes]

(5)permission_classes

所以settings全局配置就如下

#全局
REST_FRAMEWORK = {
   "DEFAULT_PERMISSION_CLASSES":['API.utils.permission.SVIPPremission'],
}

內置權限

django-rest-framework內置權限BasePermission

默認是沒有限制權限

class BasePermission(object):
    """
    A base class from which all permission classes should inherit.
    """

    def has_permission(self, request, view):
        """
        Return `True` if permission is granted, `False` otherwise.
        """
        return True

    def has_object_permission(self, request, view, obj):
        """
        Return `True` if permission is granted, `False` otherwise.
        """
        return True

我們自己寫的權限類,應該去繼承BasePermission,修改之前寫的permission.py文件

# utils/permission.py

from rest_framework.permissions import BasePermission

class SVIPPremission(BasePermission):
    message = "必須是SVIP才能訪問"
    def has_permission(self,request,view):
        if request.user.user_type != 3:
            return False
        return True


class MyPremission(BasePermission):
    def has_permission(self,request,view):
        if request.user.user_type == 3:
            return False
        return True

總結:

(1)使用

  • 自己寫的權限類:1.必須繼承BasePermission類; 2.必須實現:has_permission方法

(2)返回值

  • True 有權訪問
  • False 無權訪問

(3)局部

  • permission_classes = [MyPremission,]

(4)全局

REST_FRAMEWORK = {
   #權限
    "DEFAULT_PERMISSION_CLASSES":['API.utils.permission.SVIPPremission'],
}

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※幫你省時又省力,新北清潔一流服務好口碑

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

使用DragonFly進行智能鏡像分發

Dragonfly 是一款基於 P2P 的智能鏡像和文件分發工具。它旨在提高文件傳輸的效率和速率,最大限度地利用網絡帶寬,尤其是在分發大量數據時,例如應用分發、緩存分發、日誌分發和鏡像分發。

在阿里巴巴,Dragonfly 每個月會被調用 20 億次,分發的數據量高達 3.4PB。Dragonfly 已成為阿里巴巴基礎設施中的重要一環。

儘管容器技術大部分時候簡化了運維工作,但是它也帶來了一些挑戰:例如鏡像分發的效率問題,尤其是必須在多個主機上複製鏡像分發時。

Dragonfly 在這種場景下能夠完美支持 Docker 和 PouchContainer。它也兼容其他格式的容器。相比原生方式,它能將容器分發速度提高 57 倍,並讓 Registry 網絡出口流量降低 99.5%。
Dragonfly 能讓所有類型的文件、鏡像或數據分發變得簡單而經濟。

更多請通過官方文檔了解。

純Docker部署

這裏採用多機部署,方案如下:

應用 IP
服務端 172.17.100.120
客戶端 172.17.100.121
客戶端 172.17.100.122

部署服務端

以docker方式部署,命令如下:

docker run -d --name supernode --restart=always -p 8001:8001 -p 8002:8002 \
    dragonflyoss/supernode:0.3.0 -Dsupernode.advertiseIp=172.17.100.120

部署客戶端

準備配置文件
Dragonfly 的配置文件默認位於 /etc/dragonfly 目錄下,使用容器部署客戶端時,需要將配置文件掛載到容器內。
為客戶端配置 Dragonfly Supernode 地址:

cat <<EOD > /etc/dragonfly/dfget.yml
nodes:
    - 172.17.100.120
EOD

啟動客戶端
docker run -d --name dfclient --restart=always -p 65001:65001 \
    -v /etc/dragonfly:/etc/dragonfly \
    dragonflyoss/dfclient:v0.3.0 --registry https://index.docker.io

registry是倉庫地址,這裏使用的官方倉庫

修改Docker Daemon配置

我們需要修改 Dragonfly 客戶端機器(dfclient0, dfclient1)上 Docker Daemon 配置,通過 mirror 方式來使用 Dragonfly 進行鏡像的拉取。
在配置文件 /etc/docker/daemon.json 中添加或更新如下配置項:

{
  "registry-mirrors": ["http://127.0.0.1:65001"]
}

然後重啟Docker

systemctl restart docker

拉取鏡像測試

在任意一台客戶端上進行測試,比如:

docker pull tomcat

驗證

查看client端的日誌,如果輸出如下,則表示是通過DragonFly來傳輸的。

docker exec dfclient grep 'downloading piece' /root/.small-dragonfly/logs/dfclient.log
2020-06-20 15:56:49.813 INFO sign:146-1592668602.159 : downloading piece:{"taskID":"4d977359836129ce2eec4b8418a7042c47db547a239e2a577ddc787ee177289c","superNode":"172.17.100.120","dstCid":"cdnnode:172.17.100.120~4d977359836129ce2eec4b8418a7042c47db547a239e2a577ddc787ee177289c","range":"0-4194303","result":503,"status":701,"pieceSize":4194304,"pieceNum":0}

如果需要查看鏡像是否通過其他 peer 節點來完成傳輸,可以執行以下命令:

docker exec dfclient grep 'downloading piece' /root/.small-dragonfly/logs/dfclient.log | grep -v cdnnode

如果以上命令沒有輸出結果,則說明鏡像沒有通過其他peer節點完成傳輸,否則說明通過其他peer節點完成傳輸。

在Kubernetes中部署

服務端以Deployment的形式部署

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: supernode
  name: supernode
  namespace: kube-system
spec:
  replicas: 1
  selector:
    matchLabels:
      app: supernode
  template:
    metadata:
      labels:
        app: supernode
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ""
    spec:
      containers:
      - image: dragonflyoss/supernode:0.3.0
        name: supernode
        ports:
        - containerPort: 8080
          hostPort: 8080
          name: tomcat
          protocol: TCP
        - containerPort: 8001
          hostPort: 8001
          name: register
          protocol: TCP
        - containerPort: 8002
          hostPort: 8002
          name: download
          protocol: TCP
        volumeMounts:
        - mountPath: /etc/localtime
          name: ltime
        - mountPath: /home/admin/supernode/logs/
          name: log
        - mountPath: /home/admin/supernode/repo/
          name: data
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet
      restartPolicy: Always
      tolerations:
      - effect: NoExecute
        operator: Exists
      - effect: NoSchedule
        operator: Exists
      nodeSelector:
        node-role.kubernetes.io/master: ""
      volumes:
      - hostPath:
          path: /etc/localtime
          type: ""
        name: ltime
      - hostPath:
          path: /data/log/supernode
          type: DirectoryOrCreate
        name: log
      - hostPath:
          path: /data/supernode/repo/
          type: DirectoryOrCreate
        name: data

---
kind: Service
apiVersion: v1
metadata:
  name: supernode
  namespace: kube-system
spec:
  selector:
    app: supernode
  ports:
  - name: register
    protocol: TCP
    port: 8001
    targetPort: 8001
  - name: download
    protocol: TCP
    port: 8002
    targetPort: 8002

以hostNetwork的形式部署在master上。

部署過後可以看到supernode已經正常啟動了。

# kubectl get pod -n kube-system | grep supernode
supernode-86dc99f6d5-mblck                 1/1     Running   0          4m1s

客戶端以daemonSet的形式部署,yaml文件如下:

apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: dfdaemon
  namespace: kube-system
spec:
  selector:
    matchLabels:
      app: dfdaemon
  template:
    metadata:
      annotations:
        scheduler.alpha.kubernetes.io/critical-pod: ""
      labels:
        app: dfdaemon
    spec:
      containers:
      - image: dragonflyoss/dfclient:v0.3.0
        name: dfdaemon
        imagePullPolicy: IfNotPresent
        args:
        - --registry https://index.docker.io
        resources:
          requests:
            cpu: 250m
        volumeMounts:
        - mountPath: /etc/dragonfly/dfget.yml
          subPath: dfget.yml
          name: dragonconf
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet
      restartPolicy: Always
      tolerations:
      - effect: NoExecute
        operator: Exists
      - effect: NoSchedule
        operator: Exists
      volumes:
      - name: dragonconf
        configMap:
          name: dragonfly-conf

配置文件我們以configMap的形式掛載,所以我們還需要編寫一個configMap的yaml文件,如下:

apiVersion: v1
kind: ConfigMap
metadata:
  name: dragonfly-conf
  namespace: kube-system
data:
  dfget.yml: |
    nodes:
    - 172.17.100.120

部署過後觀察結果

# kubectl get pod -n kube-system | grep dfdaemon
dfdaemon-mj4p6                             1/1     Running   0          3m51s
dfdaemon-wgq5d                             1/1     Running   0          3m51s
dfdaemon-wljt6                             1/1     Running   0          3m51s

然後修改docker daemon的配置,如下:

{
  "registry-mirrors": ["http://127.0.0.1:65001"]
}

重啟docker

systemctl restart docker

現在我們來拉取鏡像測試,並觀察日誌輸出。
下載鏡像(在master上測試的):

docker pull nginx

然後觀察日誌

kubectl exec  -n kube-system dfdaemon-wgq5d  grep 'downloading piece' /root/.small-dragonfly/logs/dfclient.log

看到日誌輸出如下,表示成功

2020-06-20 17:14:54.578 INFO sign:128-1592673287.190 : downloading piece:{"taskID":"089dc52627a346df2a2ff67f6c07497167b35c4bad2bca1e9aad087441116982","superNode":"172.17.100.120","dstCid":"cdnnode:192.168.235.192~089dc52627a346df2a2ff67f6c07497167b35c4bad2bca1e9aad087441116982","range":"0-4194303","result":503,"status":701,"pieceSize":4194304,"pieceNum":0}

今天的測試就到這裏,我這是自己的小集群實驗室,效果其實並不明顯,在大集群效果可能更好。

  • 參考
    • https://d7y.io/zh-cn/docs/userguide/multi_machines_deployment.html

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

FB行銷專家,教你從零開始的技巧

006.OpenShift持久性存儲

一 持久存儲

1.1 持久存儲概述

默認情況下,運行容器使用容器內的臨時存儲。Pods由一個或多個容器組成,這些容器一起部署,共享相同的存儲和其他資源,可以在任何時候創建、啟動、停止或銷毀。使用臨時存儲意味着,當容器停止時,寫入容器內的文件系統的數據將丟失。
當容器在停止時也需要持久的保存數據時,OpenShift使用Kubernetes持久卷(PVs)為pod提供持久存儲。

1.2 持久存儲場景

通常用於數據庫,啟動一個數據庫的pod時提供的默認臨時存儲。如果銷毀並重新創建數據庫pod,則銷毀臨時存儲並丟失數據。如果使用持久存儲,則數據庫將數據存儲到pod外部的持久卷中。如果銷毀並重新創建pod,數據庫應用程序將繼續訪問存儲數據的相同外部存儲。

1.3 持久存儲相關概念

持久卷(PV)是OpenShift資源,它只由OpenShift管理員創建和銷毀。持久卷資源表示所有OpenShift節點都可以訪問的網絡連接存儲。
持久性存儲組件:
OCP使用Kubernetes持久卷(PV)技術,允許管理員為集群提供持久性存儲。開發人員使用持久性卷聲明(PVC)請求PV資源,而不需要了解具體的底層存儲基礎設施。
Persistent Volume:PV是OpenShift集群中的資源,由PersistentVolume API對象定義,它表示集群中由管理員提供的現有網絡存儲的一部分。它是集群中的資源,就像節點是集群資源一樣。PV的生命周期獨立於使用PV的任何單獨pod。
Persistent Volume Claim:pvc由PersistentVolumeClaim API對象定義,該對象表示開發人員對存儲的請求。它與pod類似,pod消耗節點資源,而pvc消耗PV資源。

1.4 持久存儲插件

卷是掛載的文件系統,對pods及其容器可用,並且可以由許多本地或網絡連接的存儲進行備份。OpenShift使用插件來支持以下不同的後端用於持久存儲:

  • NFS
  • GlusterFS
  • OpenStack Cinder
  • Ceph RBD
  • AWS Elastic Block Store (EBS)
  • GCE Persistent Disk
  • iSCSI
  • Fibre Channel
  • Azure Disk and Azure File
  • FlexVolume (allows for the extension of storage back-ends that do not have a built-in plug-in)
  • VMWare vSphere
  • Dynamic Provisioning and Creating Storage Classes
  • Volume Security
  • Selector-Label Volume Binding

1.5 PV訪問模式

PV可以以resource provider的任何方式掛載在主機上,provider具有不同的功能,並且每個持久卷的訪問模式都設置為該特定卷支持的特定模式。例如,NFS可以支持多個讀/寫客戶端,但是特定的NFS PV可以在服務器上作為只讀導出。
每個PV接收自己的一組訪問模式,描述特定的持久卷的功能。
訪問模式見下錶:

訪問模式 CLI縮寫 描述
ReadWriteOnce RWO 卷可以被單個節點掛載為讀/寫
ReadOnlyMany ROX 卷可以由許多節點以只讀方式掛載
ReadWriteMany RWX 卷可以被許多節點掛載為讀/寫

PV claims與具有類似訪問模式的卷匹配。唯一的兩個匹配標準是訪問模式和大小。claim的訪問模式表示請求。因此,可以授予用戶更大的訪問權限,但絕不能減少訪問權限。例如,如果一個claim請求RWO,但是惟一可用的卷是NFS PV (RWO+ROX+RWX),那麼claim將匹配NFS,因為它支持RWO。
所有具有相同模式的卷都被分組,然後按大小(從最小到最大)排序。
master上負責將PV綁定到PVC上的service接收具有匹配模式的組,並在每個組上迭代(按大小順序),直到一個大小匹配為止,然後將PV綁定到PVC上。

1.6 Persistent Volume Storage Classes

PV Claims可以通過在storageClassName屬性中指定它的名稱來選擇性地請求特定的存儲類。只有與PVC具有相同存儲類名稱的請求類的pv才能綁定到PVC。
集群管理員可以為所有PVC設置一個默認存儲類,或者配置動態供應程序來服務一個或多個存儲類,這些存儲類將匹配可用PVC中的規範。

1.7 創建pv和PVC資源

pv是集群中的資源,pvc是對這些資源的請求,也充當對資源的claim檢查。pv與PVCs的相互作用具有以下生命周期:

  • 創建持久卷

集群管理員創建任意數量的pv,這些pv表示集群用戶可以通過OpenShift API使用的實際存儲的信息。

  • 定義持久卷聲明

用戶創建具有特定存儲量、特定訪問模式和可選存儲類的PVC。master監視新的pvc,要麼找到匹配的PV,要麼等待存儲類創建一個供應程序,然後將它們綁定在一起。

  • 使用持久存儲

Pods使用claims作為卷。集群檢查查找綁定卷的聲明,併為pod綁定該卷。對於那些支持多種訪問模式的卷,用戶在將其聲明用作pod中的卷時指定需要哪種模式。
一旦用戶有了一個claim,並且該claim被綁定,綁定的PV就屬於用戶,使用過程中該PV都屬於該用戶。用戶通過在pod的Volume中包含一個持久的卷claim來調度pod並訪問其聲明的pv。

1.8 使用NFS的PV

OpenShift使用隨機uid運行容器,因此將Linux用戶從OpenShift節點映射到NFS服務器上的用戶並不能正常工作。作為OpenShift pv使用的NFS共享必須遵從如下配置:

  • 屬於nfsnobody用戶和組。
  • 擁有rwx——權限(即0700)。
  • 使用all_squash選項

示例配置:
/var/export/vol *(rw,async,all_squash)
其他NFS export選項,例如sync和async,與OpenShift無關。如果使用任何一個選項,OpenShift都可以工作。但是,在高延遲環境中,添加async選項可以加快NFS共享的寫操作(例如,將image push到倉庫的場景)。
使用async選項更快,因為NFS服務器在處理請求時立即響應客戶端,而不需要等待數據寫到磁盤。
當使用sync選項時,則相反,NFS服務器只在數據寫到磁盤之後才響應客戶端。
注意:NFS共享文件系統大小和用戶配額對OpenShift沒有影響。PV大小在PV資源定義中指定。如果實際文件系統更小,則PV被創建並綁定。如果PV更大,OpenShift不會將使用的空間限製為指定的PV大小,並且允許容器使用文件系統上的所有空閑空間。OpenShift自身提供了存儲配額和存儲位置限制,可用於控制項目中的資源分配。
默認的SELinux策略不允許容器訪問NFS共享。必須在每個OpenShift實例節點中更改策略,方法是將virt_use_nfs和virt_sandbox_use_nfs變量設置為true。

  1 # setsebool -P virt_use_nfs=true
  2 # setsebool -P virt_sandbox_use_nfs=true

 

1.9 NFS回收政策

NFS支持OpenShift的Recyclable插件,根據在每個持久卷上設置的策略處理自動執行回收任務。
默認情況下,持久卷被設置為Retain。Retain reclaim策略允許手動回收資源。當刪除pv claim時,持久卷仍然存在,並且認為該卷已被釋放。但它還不能用於另一個claim,因為來自前一個claim的數據仍然保留在卷上。此時管理員可以手動回收卷。
NFS卷及其回收策略設置為Recycle,表示在從claim中釋放后將被清除。例如,當將NFS回收策略設置為Recycle后,在刪除用戶綁定到該卷的pv claim之後,會在該卷上運行rm -rf命令。在它被回收之後,NFS卷可以直接綁定到一個新的pv claim。

1.10 Supplemental group

Supplemental group是常規的Linux組。當一個進程在Linux中運行時,它有一個UID、一個GID和一個或多個Supplemental group。可以為容器的主進程設置這些屬性。
Supplemental groupid通常用於控制對共享存儲的訪問,比如NFS和GlusterFS,而fsGroup用於控制對塊存儲(如Ceph的RBD活iSCSI)的訪問。
OpenShift共享存儲插件掛載卷,以便使掛載上的POSIX權限與目標存儲上的權限匹配。例如,如果目標存儲的所有者ID是1234,組ID是5678,那麼宿主節點和容器中的掛載將具有相同的ID。因此,容器的主進程必須匹配一個或兩個id,才能訪問該卷。

  1 [root@node ~]# showmount -e
  2 Export list for master.lab.example.com:
  3 /var/export/nfs-demo *
  4 [root@services ~]# cat /etc/exports.d/nfs-demo.conf
  5 /var/export/nfs-demo
  6 ...
  7 [root@services ~]# ls -lZ /var/export -d
  8 drwx------. 10000000 650000 unconfined_u:object_r:usr_t:s0 /var/export/nfs-demo

 
圖上示例,UID 10000000和組650000可以訪問/var/export/nfs-demo export。通常,容器不應該作為root用戶運行。在這個NFS示例中,如果容器不是作為UID 10000000運行的,並且不是組650000的成員,那麼這些容器就不能訪問NFS export。

1.11 通過fsgroup使用塊存儲

fsGroup定義了pod的“file-system group”ID,該ID被添加到容器的supplemental group中。supplemental group ID應用於共享存儲,而fsGroup ID用於塊存儲。
塊存儲,如Ceph RBD、iSCSI和各種類型的雲存儲,通常專用於單個pod。與共享存儲不同,塊存儲由pod接管,這意味着pod(或image)定義中提供的用戶和組id應用於實際的物理塊設備,塊存儲通常不共享。

1.12 SELINUX和卷security

除了SCC之外,所有預定義的安全上下文約束都將seLinuxContext設置為MustRunAs。最可能匹配pod需求的SCC迫使pod使用SELinux策略。pod使用的SELinux策略可以在pod本身、image、SCC或project(提供默認值)中定義。
SELinux標籤可以在pod的securityContext中定義。,並支持user、role、type和level標籤。

1.13 ELinuxContext選項

  • MustRunAs

如果不使用預先分配的值,則要求配置seLinuxOptions。使用seLinuxOptions作為默認值,從而針對seLinuxOptions驗證。

  • RunAsAny

沒有提供默認,允許指定任何seLinuxOptions。

二 持久卷練習

2.1 前置準備

準備完整的OpenShift集群,參考《003.OpenShift網絡》2.1。

2.2 本練習準備

  1 [student@workstation ~]$ lab deploy-volume setup

2.3 配置NFS

本實驗不詳解NFS的配置和創建,直接使用/root/DO280/labs/deploy-volume/config-nfs.sh腳本實現,具體腳本內容可通過以下方式查看。
同時NFS由services節點提供。

  1 [root@services ~]# less -FiX /root/DO280/labs/deploy-volume/config-nfs.sh
  2 [root@services ~]# /root/DO280/labs/deploy-volume/config-nfs.sh		#創建NFS
  3 Export directory /var/export/dbvol created.
  4 [root@services ~]# showmount -e						#確認驗證

 

2.4 node節點掛載NFS

  1 [root@node1 ~]# mount -t nfs services.lab.example.com:/var/export/dbvol /mnt
  2 [root@node1 ~]# mount | grep /mnt
  3 [root@node1 ~]# ll -a /mnt/		#檢查相關權限

 

  1 [root@node1 ~]# umount /mnt/		#卸載

提示:建議node2也做以上掛載測試,測試完成后建議下載,NFS共享在OpenShift需要的時候會自動掛載。

2.5 創建持久卷

  1 [student@workstation ~]$ oc login -u admin -p redhat https://master.lab.example.com
  2 [student@workstation ~]$ less -FiX /home/student/DO280/labs/deploy-volume/mysqldb-volume.yml
  3 apiVersion: v1
  4 kind: PersistentVolume
  5 metadata:
  6   name: mysqldb-volume
  7 spec:
  8   capacity:
  9     storage: 3Gi
 10   accessModes:
 11   - ReadWriteMany
 12   nfs:
 13     path: /var/export/dbvol
 14     server: services.lab.example.com
 15   persistentVolumeReclaimPolicy: Recycle
 16 [student@workstation ~]$ oc create -f /home/student/DO280/labs/deploy-volume/mysqldb-volume.yml
 17 [student@workstation ~]$ oc get pv		#查看PV
 18 NAME    CAPACITYACCESS    MODES    RECLAIM    POLICY STATUS    CLAIM    STORAGECLASS    REASON    AGE
 19 mysqldb-volume    3Gi     RWX      Recycle    Available                                           1m

 

2.6 創建項目

  1 [student@workstation ~]$ oc login -u developer -p redhat https://master.lab.example.com
  2 [student@workstation ~]$ oc new-project persistent-storage

 

2.7 部署應用

  1 [student@workstation ~]$ oc new-app --name=mysqldb \
  2 --docker-image=registry.lab.example.com/rhscl/mysql-57-rhel7 \
  3 -e MYSQL_USER=ose \
  4 -e MYSQL_PASSWORD=openshift \
  5 -e MYSQL_DATABASE=quotes
  6 [student@workstation ~]$ oc status		#確認驗證
  7 In project persistent-storage on server https://master.lab.example.com:443
  8 
  9 
 10 svc/mysqldb - 172.30.39.72:3306
 11   dc/mysqldb deploys istag/mysqldb:latest
 12     deployment #1 deployed 58 seconds ago - 1 pod

2.8 配置持久卷

  1 [student@workstation ~]$ oc describe pod mysqldb | grep -A2 'Volumes'	#查看當前pod的Volume
  2 Volumes:
  3   mysqldb-volume-1:
  4     Type:    EmptyDir (a temporary directory that shares a pod's lifetime)
  5 [student@workstation ~]$ oc set volumes dc mysqldb \
  6 --add --overwrite --name=mysqldb-volume-1 -t pvc \
  7 --claim-name=mysqldb-pvclaim \
  8 --claim-size=3Gi \
  9 --claim-mode='ReadWriteMany'		#修改dc並創建PVC
 10 [student@workstation ~]$ oc describe pod mysqldb | grep -E -A 2 'Volumes|ClaimName'	#查看驗證

 

  1 [student@workstation ~]$ oc get pvc		#查看PVC
  2 NAME              STATUS    VOLUME           CAPACITY   ACCESS MODES   STORAGECLASS   AGE
  3 mysqldb-pvclaim   Bound     mysqldb-volume   3Gi        RWX                           2m

 

2.9 端口轉發

  1 [student@workstation ~]$ oc get pod
  2 NAME              READY     STATUS    RESTARTS   AGE
  3 mysqldb-2-r7wz8   1/1       Running   0          4m
  4 [student@workstation ~]$ oc port-forward mysqldb-2-r7wz8 3306:3306

 

2.10 測試數據庫

  1 [student@workstation ~]$ mysql -h127.0.0.1 -uose -popenshift \
  2 quotes < /home/student/DO280/labs/deploy-volume/quote.sql	#填充數據測試
  3 [student@workstation ~]$ mysql -h127.0.0.1 -uose -popenshift \
  4 quotes -e "select count(*) from quote;"				#確認填充完成
  5 [student@workstation ~]$ ssh root@services ls -la /var/export/dbvol	#查看NFS服務端數據
  6 ……
  7 drwxr-x---. 2 nfsnobody nfsnobody       54 Jul 21 23:43 quotes
  8 ……
  9 [student@workstation ~]$ ssh root@services ls -la /var/export/dbvol/quotes
 10 total 116
 11 drwxr-x---. 2 nfsnobody nfsnobody    54 Jul 21 23:43 .
 12 drwx------. 6 nfsnobody nfsnobody  4096 Jul 21 23:39 ..
 13 -rw-r-----. 1 nfsnobody nfsnobody    65 Jul 21 23:39 db.opt
 14 -rw-r-----. 1 nfsnobody nfsnobody  8584 Jul 21 23:43 quote.frm
 15 -rw-r-----. 1 nfsnobody nfsnobody 98304 Jul 21 23:44 quote.ibd

 

2.11 刪除PV

  1 [student@workstation ~]$ oc delete project persistent-storage	#刪除項目
  2 project "persistent-storage" deleted
  3 [student@workstation ~]$ oc delete pv mysqldb-volume		#刪除PV
  4 persistentvolume "mysqldb-volume" deleted

   

2.12 驗證持久性

刪除PV后驗證數據是否會長期保留。

  1 [student@workstation ~]$ ssh root@services ls -la /var/export/dbvol
  2 ……
  3 drwxr-x---. 2 nfsnobody nfsnobody       54 Jul 21 23:43 quotes
  4 ……
  5 [student@workstation ~]$ ssh root@services rm -rf /var/export/dbvol/*	#使用rm才可以徹底刪除

 

三 私有倉庫持久存儲

3.1 創建私有倉庫持久卷

OCP內部倉庫是source-to-image(S2I)流程的一個重要組件,該流程用於從應用程序源代碼創建pod。S2I流程的最終輸出是一個容器image,它被推送到OCP內部倉庫,然後可以用於部署。
在生產環境中,通常建議為內部倉庫提供一個持久性存儲。否則,在重新創建registry pod之後,S2I創建的pod可能無法啟動。例如,在master節點重新啟動之後。
OpenShift安裝程序配置並啟動一個默認的持久倉庫,該倉庫使用NFS共享,由Inventory文件中的openshift_hosted_registry_storage_*變量定義。在生產環境中,Red Hat建議由外部專用的存儲提供持久性存儲,該服務器配置為彈性和高可用性。
高級安裝程序將NFS服務器配置為使用外部NFS服務器上的持久存儲,在[NFS]字段中定義的一個NFS服務器的列表。該服務器與openshift_hosted_registry_storage*變量一起使用,以配置NFS服務器。
示例配置:

  1 [OSEv3:vars]
  2 openshift_hosted_registry_storage_kind=nfs		#定義OCP存儲後端
  3 openshift_hosted_registry_storage_access_modes=['ReadWriteMany']	#定義訪問模式,默認為ReadWriteMany,表示允許多個節點以讀寫形式掛載
  4 openshift_hosted_registry_storage_nfs_directory=/exports		#定義NFS服務器上的NFS存儲目錄
  5 openshift_hosted_registry_storage_nfs_options='*(rw,root_squash)'	#定義存儲卷的NFS選項。這些選項被添加到/etc/ exports.d/openshift-ansible.exports中。rw選項允許對NFS卷進行讀寫訪問,root_squash選項阻止遠程連接的根用戶擁有root特權,併為nfsnobody分配用戶ID
  6 openshift_hosted_registry_storage_volume_name=registry		#定義要用於持久倉庫的NFS目錄的名稱
  7 openshift_hosted_registry_storage_volume_size=40Gi			#定義持久卷大小
  8 ... output omitted ...
  9 [nfs]
 10 services.lab.example.com

 
在為持久倉庫安裝和配置存儲之後,OpenShift在OpenShift項目中創建一個名為register-volume的持久卷。持久性卷的容量為40gb,並且根據定義設置了Retain策略。同時默認項目中的pvc調用pv。

  1 [student@workstation ~]$ oc describe pv registry-volume
  2 Name:            registry-volume	#定義持久卷名
  3 Labels:          <none>
  4 Annotations:     pv.kubernetes.io/bound-by-controller=yes
  5 StorageClass:
  6 Status:          Bound
  7 Claim:           default/registry-claim	#定義使用持久卷的聲明
  8 Reclaim Policy:  Retain			#默認持久卷策略,具有Retain策略的卷在從其聲明中釋放后不會被擦除
  9 Access Modes:    RWX			#定義持久卷的訪問模式,由Ansible inventory文件的openshift_hosted_registry_storage_access_modes=['ReadWriteMany']變量定義
 10 Capacity:        40Gi			#定義持久卷的大小,由Ansible inventory文件的openshift_hosted_registry_storage_volume_size變量定義
 11 Message:
 12 Source:					#定義存儲後端的位置和NFS共享
 13     Type:      NFS (an NFS mount that lasts the lifetime of a pod)
 14     Server:    services.lab.example.com
 15     Path:      /exports/registry
 16     ReadOnly:  false
 17 Events:        <none>

 
運行以下命令,確認OpenShift內部倉庫已配置registry-volume作為默認的PersistentVolumeClaim。

  1 [user@demo ~] oc describe dc/docker-registry | grep -A4 Volumes
  2   Volumes:
  3    registry-storage:
  4     Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
  5     ClaimName:  registry-claim
  6     ReadOnly:   false

 
OCP內部倉庫將image和metadata存儲為普通文件和文件夾,這意味着可以檢查PV源存儲,查看倉庫是否向其寫入了文件。
在生產環境中,這是通過訪問外部NFS服務器來完成的。但是,在本環境中,NFS共享是在services的VM上配置的,因此ssh至services查看,以便於驗證OCP內部倉庫成功將image存儲到持久存儲中。
示例:一個名為hello的應用程序在default命名空間中運行,下面的命令驗證圖像是否存儲在持久存儲中。

  1 [user@demo ~] ssh root@master ls -l \
  2 /var/export/registryvol/docker/registry/v2/repositories/default/

 

四 PV綜合實驗

4.1 前置準備

準備完整的OpenShift集群,參考《003.OpenShift網絡》2.1。

4.2 本練習準備

[student@workstation ~]$ lab storage-review setup

4.3 配置NFS

本實驗不詳解NFS的配置和創建,直接使用/root/DO280/labs/deploy-volume/config-nfs.sh腳本實現,具體腳本內容可通過以下方式查看。
同時NFS由services節點提供。

  1 [root@services ~]# less -FiX /root/DO280/labs/storage-review/config-review-nfs.sh
  2 [root@services ~]# /root/DO280/labs/storage-review/config-review-nfs.sh		#創建NFS
  3 [root@services ~]# showmount -e							#確認驗證

 

4.4 創建持久卷

  1 [student@workstation ~]$ oc login -u admin -p redhat https://master.lab.example.com
  2 [student@workstation ~]$ less -FiX /home/student/DO280/labs/storage-review/review-volume-pv.yaml
  3 apiVersion: v1
  4 kind: PersistentVolume
  5 metadata:
  6   name: review-pv
  7 spec:
  8   capacity:
  9     storage: 3Gi
 10   accessModes:
 11   - ReadWriteMany
 12   nfs:
 13     path: /var/export/review-dbvol
 14     server: services.lab.example.com
 15   persistentVolumeReclaimPolicy: Recycle
 16 [student@workstation ~]$ oc create -f /home/student/DO280/labs/storage-review/review-volume-pv.yaml
 17 [student@workstation ~]$ oc get pv		#查看PV
 18 NAME    CAPACITYACCESS    MODES    RECLAIM    POLICY STATUS    CLAIM    STORAGECLASS    REASON    AGE
 19 review-pv    3Gi     RWX      Recycle    Available                                           13s

 

4.5 部署模板

  1 [student@workstation ~]$ less -FiX /home/student/DO280/labs/storage-review/instructor-template.yaml
  2 [student@workstation ~]$ oc create -n openshift -f /home/student/DO280/labs/storage-review/instructor-template.yaml
  3 #使用模板創建應用至openshift namespace中

 

4.6 創建項目

  1 [student@workstation ~]$ oc login -u developer -p redhat https://master.lab.example.com
  2 [student@workstation ~]$ oc new-project instructor

 

4.7 web部署應用

瀏覽器訪問: https://master.lab.example.com

選擇Catalog

選擇PHP,並使用instructor-template。

設置Application Hostname,然後直接下一步,模板會創建一個數據庫服務器。

單擊Continue to project overview以監視應用程序的構建過程。從提供的服務框架中,單擊講師。單擊部署配置#1條目旁邊的下拉箭頭,打開部署面板。當構建完成時,build部分的Complete旁邊應該出現一個綠色的複選標記。

4.8 端口轉發

  1 [student@workstation ~]$ oc login -u developer -p redhat https://master.lab.example.com
  2 [student@workstation ~]$ oc get pod
  3 NAME                 READY     STATUS      RESTARTS   AGE
  4 instructor-1-9fmct   1/1       Running     0          43s
  5 instructor-1-build   0/1       Completed   0          2m
  6 mysql-1-f7rrq        1/1       Running     0          2m
  7 [student@workstation ~]$ oc port-forward mysql-1-f7rrq 3306:3306

 

4.9 填充數據庫

  1 [student@workstation ~]$ mysql -h127.0.0.1 -u instructor -ppassword \
  2 instructor < /home/student/DO280/labs/storage-review/instructor.sql
  3 [student@workstation ~]$ mysql -h127.0.0.1 -u instructor -ppassword instructor -e "select * from instructors;"	#查看
  4 

4.10 測試訪問

workstations的瀏覽器訪問:http://instructor.apps.lab.example.com

4.11 測試添加數據

 
 

4.12 確認驗證

  1 [student@workstation ~]$ lab storage-review grade		#環境腳本判斷實驗

4.13 清理刪除

  1 [student@workstation ~]$ oc login -uadmin -predhat
  2 [student@workstation ~]$ oc delete project instructor
  3 [student@workstation ~]$ oc delete pv review-pv
  4 [student@workstation ~]$ ssh root@services
  5 [root@services ~]# rm -rf /var/export/review-dbvol
  6 [root@services ~]# rm -f /etc/exports.d/review-dbvol.exports

  本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

新北清潔公司,居家、辦公、裝潢細清專業服務

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

C#多線程編程(一)進程與線程

一、 進程

        簡單來說,進程是對資源的抽象,是資源的容器,在傳統操作系統中,進程是資源分配的基本單位,而且是執行的基本單位,進程支持併發執行,因為每個進程有獨立的數據,獨立的堆棧空間。一個程序想要併發執行,開多個進程即可。

Q1:在單核下,進程之間如何同時執行?

        首先要區分兩個概念——併發和并行

  • 併發:併發是指在一段微小的時間段中,有多個程序代碼段被CPU執行,宏觀上表現出來就是多個程序能”同時“執行。
  • 并行:并行是指在一個時間點,有多個程序段代碼被CPU執行,它才是真正的同時執行。

        所以應該說進程之間是併發執行。對於CPU來講,它不知道進程的存在,CPU主要與寄存器打交道。有一些常用的寄存器,如程序計數器寄存器,這個寄存器存儲了將要執行的指令的地址,這個寄存器的地址指向哪,CPU就去哪。還有一些堆棧寄存器和通用寄存器等等等,總之,這些數據構成了一個程序的執行環境,這個執行環境就叫做”上下文(Context)“,進程之間切換本質就是保存這些數據到內存,術語叫做”保存現場“,然後恢復某個進程的執行環境,也即是”恢復現場“,整個過程術語叫做“上下文切換”,具體點就是進程上下文切換,這就是進程之間能併發執行的本質——頻繁的切換進程上下文。這個功能是由操作系統提供的,是內核態的,對應用軟件開發人員透明。

二、 線程

        進程雖然支持併發,但是對併發不是很友好,不友好是指每開啟一個進程,都要重新分配一部分資源,而線程相對進程來說,創建線程的代價比創建進程要小,所以引入線程能更好的提高併發性。在現代操作系統中,進程變成了資源分配的基本單位,而線程變成了執行的基本單位,每個線程都有獨立的堆棧空間,同一個進程的所有線程共享代碼段和地址空間等共享資源。相應的上下文切換從進程上下文切換變成了線程上下文切換

三、 為什麼要引入進程和線程

  1. 提高CPU利用率,在早期的單道批處理系統中,如果執行中的代碼需要依賴與外部條件,將會導致CPU空閑,例如文件讀取,等待鍵盤信號輸入,這將浪費大量的CPU時間。引入多進程和線程可以解決CPU利用率低這個問題。
  2. 隔離程序之間的數據(每個進程都有單獨的地址空間),保證系統運行的穩定性。
  3. 提高系統的響應性和交互能力。

四、 在C#中創建託管線程

1. Thread類

在.NET中,託管線程分為:

  • 前台線程
  • 後台線程

一個.Net程序中,至少要有一個前台線程,所有前台線程結束了,所有的後台線程將會被公共語言運行時(CLR)強制銷毀,程序執行結束。

如下將在控制台程序中創建一個後台線程

 1 static void Main(string[] args)
 2 {
 3      var t = new Thread(() =>
 4      {
 5          Thread.Sleep(1000);
 6          Console.WriteLine("執行完畢");
 7      });
 8     t.IsBackground = true;
 9      t.Start();
10 }

View Code

 

主線程(默認是前台線程)執行完畢,程序直接退出。

但IsBackground 屬性改為false時,控制台會打印“執行完畢”。

2. 有什麼問題

直接使用Thread類來進行多線程編程浪費資源(服務器端更加明顯)且不方便,舉個栗子。

假如我寫一個Web服務器程序,每個請求創建一個線程,那麼每一次我都要new一個Thread對象,然後傳入處理HttpRequest的委託,處理完之後,線程將會被銷毀,這將會導致浪費大量CPU時間和內存,在早期CPU性能不行和內存資源珍貴的情況下這個缺點會被放大,在現在這個缺點不是很明顯,原因是硬件上來了。

不方便體現在哪呢?

  • 無法直接獲取另一個線程內未被捕捉的異常
  • 無法直接獲取線程函數的返回值

 

 1 public static void ThrowException()
 2 {
 3      throw new Exception("發生異常");
 4 }
 5 static void Main(string[] args)
 6 {
 7      var t = new Thread(() =>
 8      {
 9          Thread.Sleep(1000);
10          ThrowException();
11      });
12     t.IsBackground = false;
13      try
14      {
15          t.Start();
16      }
17      catch(Exception e)
18      {
19          Console.WriteLine(e.Message);
20      }
21 }

View Code

 

上述代碼將會導致程序奔潰,如下圖。

 

要想直接獲取返回值和可以直接從主線程捕捉線程函數內未捕捉的異常,我們可以這麼做。

新建一個MyTask.cs文件,內容如下

 1 using System;
 2 using System.Threading;
 3 namespace ConsoleApp1
 4 {
 5      public class MyTask
 6      {
 7          private Thread _thread;
 8          private Action _action;
 9          private Exception _innerException;
10         public MyTask()
11          {
12         }
13          public MyTask(Action action)
14          {
15              _action = action;
16          }
17          protected virtual void Excute()
18          {
19              try
20              {
21                  _action();
22              }
23              catch(Exception e)
24              {
25                  _innerException = e;
26              }
27       
28          }
29          public void Start()
30          {
31              if (_thread != null) throw new InvalidOperationException("任務已經開始");
32              _thread = new Thread(() => Excute());
33              _thread.Start();
34          }
35          public void Start(Action action)
36          {
37              _action = action;
38              if (_thread != null) throw new InvalidOperationException("任務已經開始");
39              _thread = new Thread(() => Excute());
40              _thread.Start();
41          }
42         public void Wait()
43          {
44              _thread.Join();
45              if (_innerException != null) throw _innerException;
46          }
47      }
48     public class MyTask<T> : MyTask
49      {
50          private Func<T> _func { get; }
51          private T _result;
52          public T Result {
53              
54              private set => _result = value;
55              get 
56              {
57                  base.Wait();
58                  return _result;
59              }
60          }
61          public MyTask(Func<T> func)
62          {
63              _func = func;
64          }
65         public new void Start() 
66          {
67              base.Start(() =>
68              {
69                  Result = _func();
70              });
71          }
72     }
73 }

View Code

 

簡單的包裝了一下(不要在意細節),我們便可以實現我們想要的效果。

測試代碼如下

 1 public static void ThrowException()
 2 {
 3      throw new Exception("發生異常");
 4 }
 5 public static void Test3()
 6 {
 7      MyTask<string> myTask = new MyTask<string>(() =>
 8      {
 9          Thread.Sleep(1000);
10          return "執行完畢";
11      });
12     myTask.Start();
13     try
14      {
15          Console.WriteLine(myTask.Result);
16      }
17      catch (Exception e)
18      {
19          Console.WriteLine(e.Message);
20      }
21 }
22 public static void Test2()
23 {
24      MyTask<string> myTask = new MyTask<string>(() =>
25      {
26          Thread.Sleep(1000);
27          ThrowException();
28          return "執行完畢";
29      });
30     myTask.Start();
31     try
32      {
33          Console.WriteLine(myTask.Result);
34      }
35      catch(Exception e)
36      {
37          Console.WriteLine(e.Message);
38      }
39 }
40 public static void Test1()
41 {
42      MyTask myTask = new MyTask(() =>
43      {
44          Thread.Sleep(1000);
45          ThrowException();
46      });
47      myTask.Start();
48     try
49      {
50          myTask.Wait();
51      }
52      catch (Exception e)
53      {
54          Console.WriteLine(e.Message);
55      }
56 }
57 static void Main(string[] args)
58 {
59      Test1();
60      Test2();
61      Test3();
62 }

 

可以看到,我們可以通過簡單包裝Thread對象,便可實現如下效果

  • 直接讀取線程函數返回值
  • 直接捕捉線程函數未捕捉的異常(前提是調用了Wait()函數或者Result屬性)

這是理解和運用Task的基礎,Task功能非常完善,但是運用好Task需要掌握許多概念,下面再說。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

原來你是這樣的BERT,i了i了! —— 超詳細BERT介紹(一)BERT主模型的結構及其組件

原來你是這樣的BERT,i了i了! —— 超詳細BERT介紹(一)BERT主模型的結構及其組件

BERTBidirectional Encoder Representations from Transformers)是谷歌在2018年10月推出的深度語言表示模型。

一經推出便席捲整個NLP領域,帶來了革命性的進步。
從此,無數英雄好漢競相投身於這場追劇(芝麻街)運動。
只聽得這邊G家110億,那邊M家又1750億,真是好不熱鬧!

然而大家真的了解BERT的具體構造,以及使用細節嗎?
本文就帶大家來細品一下。

前言

本系列文章分成三篇介紹BERT,本文主要介紹BERT主模型(BertModel)的結構及其組件相關知識,另有兩篇分別介紹BERT預訓練相關和如何將BERT應用到不同的下游任務

文章中的一些縮寫:NLP(natural language processing)自然語言處理;CV(computer vision)計算機視覺;DL(deep learning)深度學習;NLP&DL 自然語言處理和深度學習的交叉領域;CV&DL 計算機視覺和深度學習的交叉領域。

文章公式中的向量均為行向量,矩陣或張量的形狀均按照PyTorch的方式描述。
向量、矩陣或張量后的括號表示其形狀。

本系列文章的代碼均是基於transformers庫(v2.11.0)的代碼(基於Python語言、PyTorch框架)。
為便於理解,簡化了原代碼中不必要的部分,並保持主要功能等價。
在代碼最開始的地方,需要導入以下包:

代碼

from math import inf, sqrt
import torch as tc
from torch import nn
from torch.nn import functional as F
from transformers import PreTrainedModel

閱讀本系列文章需要一些背景知識,包括Word2VecLSTMTransformer-BaseELMoGPT等,由於本文不想過於冗長(其實是懶),以及相信來看本文的讀者們也都是衝著BERT來的,所以這部分內容還請讀者們自行學習。
本文假設讀者們均已有相關背景知識。

目錄

  • 1、主模型
    • 1.1、輸入
    • 1.2、嵌入層
      • 1.2.1、嵌入變換
      • 1.2.2、層標準化
      • 1.2.3、隨機失活
    • 1.3、編碼器
      • 1.3.1、隱藏層
        • 1.3.1.1、線性變換
        • 1.3.1.2、激活函數
          • 1.3.1.2.1、tanh
          • 1.3.1.2.2、softmax
          • 1.3.1.2.3、GELU
        • 1.3.1.3、多頭自注意力
        • 1.3.1.4、跳躍連接
    • 1.4、池化層
    • 1.5、輸出

1、主模型

BERT的主模型是BERT中最重要組件,BERT通過預訓練(pre-training),具體來說,就是在主模型后再接個專門的模塊計算預訓練的損失(loss),預訓練后就得到了主模型的參數(parameter),當應用到下游任務時,就在主模型後接個跟下游任務配套的模塊,然後主模型賦上預訓練的參數,下游任務模塊隨機初始化,然後微調(fine-tuning)就可以了(注意:微調的時候,主模型和下游任務模塊兩部分的參數一般都要調整,也可以凍結一部分,調整另一部分)。

主模型由三部分構成:嵌入層編碼器池化層
如圖:

其中

  • 輸入:一個個小批(mini-batch),小批里是batch_size個序列(句子或句子對),每個序列由若干個離散編碼向量組成。
  • 嵌入層:將輸入的序列轉換成連續分佈式表示(distributed representation),即詞嵌入(word embedding)或詞向量(word vector)。
  • 編碼器:對每個序列進行非線性表示。
  • 池化層:取出[CLS]標記(token)的表示(representation)作為整個序列的表示。
  • 輸出:編碼器最後一層輸出的表示(序列中每個標記的表示)和池化層輸出的表示(序列整體的表示)。

下面具體介紹這些部分。

1.1、輸入

一般來說,輸入BERT的可以是一句話:

I'm repairing immortals.

也可以是兩句話:

I'm repairing immortals. ||| Me too.

其中|||是分隔兩個句子的分隔符。

BERT先用專門的標記器(tokenizer)來標記(tokenize)序列,雙句標記后如下(單句類似):

I ' m repair ##ing immortal ##s . ||| Me too .

標記器其實就是先對句子進行基於規則的標記化(tokenization),這一步可以把'm以及句號.等分割開,再進行子詞分割(subword segmentation),示例中帶##的就是被子詞分割開的部分。
子詞分割有很多好處,比如壓縮詞彙表、表示未登錄詞(out of vocabulary words, OOV words)、表示單詞內部結構信息等,以後有時間專門寫一篇介紹這個。

數據集中的句子長度不一定相等,BERT採用固定輸入序列(長則截斷,短則填充)的方式來解決這個問題。
首先需要設定一個seq_length超參數(hyperparameter),然後判斷整個序列長度是否超出,如果超出:單句截掉最後超出的部分,雙句則先刪掉較長的那句話的末尾標記,如果兩句話長度相等,則輪流刪掉兩句話末尾的標記,直到總長度達到要求(即等長的兩句話刪掉的標記數量盡量相等);如果序列長度過小,則在句子最後添加[PAD]標記,使長度達到要求。

然後在序列最開始添加[CLS]標記,以及在每句話末尾添加[SEP]標記。
單句話添加一個[CLS]和一個[SEP],雙句話添加一個[CLS]和兩個[SEP]
[CLS]標記對應的表示作為整個序列的表示,[SEP]標記是專門用來分隔句子的。
注意:處理長度時需要考慮添加的[CLS][SEP]標記,使得最終總的長度=seq_length[PAD]標記在整個序列的最末尾。

例如seq_length=12,則單句變為:

[CLS] I ' m repair ##ing immortal ##s . [SEP] [PAD] [PAD]

如果seq_length=10,則雙句變為:

[CLS] I ' m repair [SEP] Me too . [SEP]

分割完后,每一個空格分割的子字符串(substring)都看成一個標記(token),標記器通過查表將這些標記映射成整數編碼。
單句如下:

[101, 146, 112, 182, 6949, 1158, 15642, 1116, 119, 102, 0, 0]

最後整個序列由四種類型的編碼向量表示,單句如下:

標記編碼:[101, 146, 112, 182, 6949, 1158, 15642, 1116, 119, 102, 0, 0]
位置編碼:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]
句子位置編碼:[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
注意力掩碼:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 0, 0]

其中,標記編碼就是上面的序列中每個標記轉成編碼后得到的向量;位置編碼記錄每個標記的位置;句子位置編碼記錄每個標記屬於哪句話,0是第一句話,1是第二句話(注意:[CLS]標記對應的是0);注意力掩碼記錄某個標記是否是填充的,1表示非填充,0表示填充。

雙句如下:

標記編碼:[101, 146, 112, 182, 6949, 102, 2508, 1315, 119, 102]
位置編碼:[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
句子位置編碼:[0, 0, 0, 0, 0, 0, 1, 1, 1, 1]
注意力掩碼:[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

上面的是英文的情況,中文的話BERT直接用漢字級別表示,即

我在修仙( ̄︶ ̄)↗

這樣的句子分割成

我 在 修 仙 (  ̄ ︶  ̄ ) ↗

然後每個漢字(包括中文標點)看成一個標記,應用上述操作即可。

1.2、嵌入層

嵌入層的作用是將序列的離散編碼錶示轉換成連續分佈式表示。
離散編碼只能表示A和B相等或不等,但是如果將其表示成連續分佈式表示(即連續的N維空間向量),就可以計算\(A\)\(B\)之間的相似度或距離了,從而表達更多信息。
這個是詞嵌入或詞向量的知識,可以參考Word2Vec相關內容,本文不再贅述了。

嵌入層包含三種組件:嵌入變換(embedding)、層標準化(layer normalization)、隨機失活(dropout)。
如圖:

1.2.1、嵌入變換

嵌入變換實際上就是一個線性變換(linear transformation)。
傳統上,離散標記往往表示成一個獨熱碼(one-hot)向量,也叫標準基向量,即一個長度為\(V\)的向量,其中只有一位為\(1\),其他都為\(0\)
在NLP&DL領域,\(V\)一般是詞彙表的大小。
但是這種向量往往維數很高(詞彙表往往比較大)而且很稀疏(每個向量只有一位不為\(0\)),不好處理。
所以可以通過一個線性變換將這個向量轉換成低維稠密的向量。

假設\(v\)\(V\))是標記\(t\)的獨熱碼向量,\(W\)\(V \times H\))是一個\(V\)\(H\)列的矩陣,則\(t\)的嵌入\(e\)為:

\[e = v W \]

實際上\(W\)中每一行都可以看成一個詞嵌入,而這個矩陣乘就是把\(v\)中等於\(1\)的那個位置對應的\(W\)中的詞嵌入取出來。
在工程實踐中,由於獨熱碼向量比較占內存,而且矩陣乘效率也不高,所以往往用一個整數編碼來代替獨熱碼向量,然後直接用查表的方式取出對應的詞嵌入。

所以假設\(n\)\(t\)的編碼,一般是在詞彙表中的編號,那麼上面的公式就可以改成:

\[e = W_{n} \]

其中下標表示取出對應的行。

那麼一個標記化后的序列就可以表示成一個編碼向量。
假設序列\(T\)的編碼向量為\(s\)\(L\)),\(L\)為序列的長度,即\(T\)中有\(L\)個標記。
如果詞嵌入長度為\(H\),那麼經過嵌入變換,得到\(T\)的隱狀態(hidden state)\(h\)\(L \times H\))。

1.2.2、層標準化

層標準化類似於批標準化(batch normalization),可以加速模型訓練,但其實現方式和批標準化不一樣,層標準化是沿着詞嵌入(通道)維進行標準化的,不需要在訓練時存儲統計量來估計整體數據集的均值和方差,訓練(training)和評估(evaluation)或推理(inference)階段的操作是相同的。
另外批標準化對小批大小有限制,而層標準化則沒有限制。

假設輸入的一個詞嵌入為\(e = [x_0, x_1, …, x_{H-1}]\)\(x_k\)\(e\)\(k = 0, 1, …, (H-1)\) 維的分量,\(H\)是詞嵌入長度。
那麼層標準化就是

\[y_{k} = \frac{x_{k}-\mu}{\sigma} * \alpha_k + \beta_k \]

其中,\(y_{k}\)是輸出,\(\mu\)\(\sigma^2\)分別是均值和方差:

\[ \mu = \frac{1}{H} \sum_{k=0}^{H-1} x_{k} \\ \sigma^2 = \frac{1}{H} \sum_{k=0}^{H-1} (x_{k}-\mu)^2 \\ \]

\(\alpha_k\)\(\beta_k\)是學習得到的參數,用於防止模型表示能力退化。

注意:\(\mu\)\(\sigma^2\)是針對每個樣本每個位置的詞嵌入分別計算的,而\(\alpha_k\)\(\beta_k\)對所有的詞嵌入都是共用的;\(\sigma^2\)的計算沒有使用貝塞爾校正(Bessel’s correction)。

1.2.3、隨機失活

隨機失活是DL領域非常著名且常用的正則化(regularization)方法(然而被谷歌註冊專利了),用來防止模型過擬合(overfitting)。

具體來說,先設置一個超參數\(P \in [0, 1]\),表示按照概率\(P\)隨機將值置\(0\)
然後假設詞嵌入中某一維分量是\(x\),按照均勻隨機分佈產生一個隨機數\(r \in [0, 1]\),然後輸出值\(y\)為:

\[ y = \left\{ \begin{aligned} & \frac{x}{1-P} &, & r > P \\ & 0 &, & r \le P \\ \end{aligned} \right. \]

由於按照概率\(P\)\(0\),相當於輸出值的期望變成原來的\((1-P)\)倍,所以再對輸出值除以\((1-P)\),就可以保持期望不變。

以上操作針對訓練階段,在評估階段,輸出值等於輸入值:

\[y = x \]

嵌入層代碼如下:

代碼

# BERT之嵌入層
class BertEmb(nn.Module):
	def __init__(self, config):
		super().__init__()
		# 標記嵌入,padding_idx=0:編碼為0的嵌入始終為零向量
		self.tok_emb = nn.Embedding(config.vocab_size, config.hidden_size, padding_idx=0)
		# 位置嵌入
		self.pos_emb = nn.Embedding(config.max_position_embeddings, config.hidden_size)
		# 句子位置嵌入
		self.sent_pos_emb = nn.Embedding(config.type_vocab_size, config.hidden_size)

		# 層標準化
		self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
		# 隨機失活
		self.dropout = nn.Dropout(config.hidden_dropout_prob)

	def forward(self,
			tok_ids,  # 標記編碼(batch_size * seq_length)
			pos_ids=None,  # 位置編碼(batch_size * seq_length)
			sent_pos_ids=None,  # 句子位置編碼(batch_size * seq_length)
	):
		device = tok_ids.device  # 設備(CPU或CUDA)
		shape = tok_ids.shape  # 形狀(batch_size * seq_length)
		seq_length = shape[1]

		# 默認:[0, 1, ..., seq_length-1]
		if pos_ids is None:
			pos_ids = tc.arange(seq_length, dtype=tc.int64, device=device)
			pos_ids = pos_ids.unsqueeze(0).expand(shape)
		# 默認:[0, 0, ..., 0],即所有標記都屬於第一個句子
		if sent_pos_ids is None:
			sent_pos_ids = tc.zeros(shape, dtype=tc.int64, device=device)

		# 三種嵌入(batch_size * seq_length * hidden_size)
		tok_embs = self.tok_emb(tok_ids)
		pos_embs = self.pos_emb(pos_ids)
		sent_pos_embs = self.sent_pos_emb(sent_pos_ids)

		# 三種嵌入相加
		embs = tok_embs + pos_embs + sent_pos_embs
		# 層標準化嵌入
		embs = self.layer_norm(embs)
		# 隨機失活嵌入
		embs = self.dropout(embs)
		return embs  # 嵌入(batch_size * seq_length * hidden_size)

其中,
config是BERT的配置文件對象,裏面記錄了各種預先設定的超參數;
vocab_size是詞彙表大小;
hidden_size是詞嵌入長度,默認是768(bert-base-*)或1024(bert-large-*);
max_position_embeddings是允許的最大標記位置,默認是512;
type_vocab_size是允許的最大句子位置,即最多能輸入的句子數量,默認是2;
layer_norm_eps是一個>0並很接近0的小數\(\epsilon\),用來防止計算時發生除0等異常操作;
hidden_dropout_prob是隨機失活概率,默認是0.1;
batch_size是小批的大小,即一個小批里的樣本個數;
seq_length是輸入的編碼向量的長度。

1.3、編碼器

編碼器的作用是對嵌入層輸出的隱狀態進行非線性表示,提取出其中的特徵(feature),它是由num_hidden_layers個結構相同(超參數相同)但參數不同(不共享參數)的隱藏層串連構成的。
如圖:

1.3.1、隱藏層

隱藏層包括線性變換、激活函數(activation function)、多頭自注意力(multi-head self-attention)、跳躍連接(skip connection),以及上面介紹過的層標準化和隨機失活。
如圖:

其中,激活函數默認是GELU,線性變換均是逐位置線性變換,即對不同樣本不同位置的詞嵌入應用相同的線性變換(類似於CV&DL領域的\(1 \times 1\)卷積)。

1.3.1.1、線性變換

線性變換在CV&DL領域也叫全連接層(fully connected layer),即

\[y = x W^T + b \]

其中,\(x\)\(A\))是輸入向量,\(y\)\(B\))是輸出向量,\(W\)\(B \times A\))是權重(weight)矩陣,\(b\)\(B\))是偏置(bias)向量;\(W\)\(b\)是學習得到的參數。

另外,嚴格來說,當\(b = \vec 0\)時,上式為線性變換;當\(b \ne \vec 0\)時,上式為仿射變換(affine transformation)。
但是在DL中,人們往往並不那麼摳字眼,對於這兩種變換,一般都簡單地稱為線性變換。

1.3.1.2、激活函數

激活函數在DL中非常關鍵!
因為如果要提高一個神經網絡(neural network)的表示能力,往往需要加深網絡的深度。
然而如果只疊加多個線性變換的話,這等價於一個線性變換(大家可以推推看)!
所以只有在線性變換後接一個非線性變換(nonlinear transformation),即激活函數,才能逐漸加深網絡並提高表示能力。

激活函數有很多,常見的包括sigmoidtanhsoftmaxReLUGELUSwishMish等。
本文只講和BERT相關的激活函數:tanh、softmax、GELU。

1.3.1.2.1、tanh

激活函數的一個功能是調整輸入值的取值範圍。
tanh即雙曲正切函數,可以將\((-\infty, +\infty)\)的數映射到\((-1, 1)\),並且嚴格單調。
函數圖像如圖:

tanh在NLP&DL領域用得比較多。

1.3.1.2.2、softmax

softmax顧名思義,它可以對輸入的一組數值根據其大小給出每個數值的概率,數值越大,概率越高,且概率求和為\(1\)

假設輸入\(x_k\)\(k = 0, 1, …, (N-1)\),則輸出值\(y_k\)為:

\[y_k = \frac{exp(x_k)}{\sum_{i=0}^{N-1} exp(x_i)} \]

實際上,對於任意一個對數幾率(logit)\(x \in (-\infty, +\infty)\)\(x\)越大,表示某個事件發生的可能性越大,softmax可以將其轉化為概率,即將取值範圍映射到\((0, 1)\)

1.3.1.2.3、GELU

GELUGaussian Error Linear Units)是2016年6月提出的一個激活函數。
GELU相比ReLU曲線更為光滑,允許梯度更好地傳播。
GELU的想法類似於隨機失活,隨機失活是按照0-1分佈,又叫兩點分佈,也叫伯努利分佈(Bernoulli distribution),隨機通過輸入值;而GELU則是將這個概率分佈改成正態分佈(Normal distribution),也叫高斯分佈(Gaussian distribution),然後輸出期望。

假設輸入值是\(x\),輸出值是\(y\),那麼GELU就是:

\[y = x P(X \le x) \]

其中,\(X \sim \mathcal{N}(0, 1)\)\(P\)為概率。

GELU的函數圖像如圖:

其中藍線為ReLU函數圖像,橙線為GELU函數圖像。

1.3.1.3、多頭自注意力

多頭自注意力是Transformer的一大特色。
多頭自注意力的名字可以分成三個詞:多頭、自、注意力:

  • 注意力:是DL領域近年來最重要的創新之一!可以使模型以不同的方式對待不同的輸入(即分配不同的權重),而無視空間(即輸入向量排成線形、面形、樹形、圖形等拓撲結構)的形狀、大小、距離。
  • 自:是在普通的注意力基礎上修改而來的,可以表示輸入與自身的依賴關係。
  • 多頭:是對注意力中涉及的向量分別拆分計算,從而提高表示能力。

對於一般的多頭注意力,假設計算\(x\)\(H\))對\(y_i\)\(H\)),\(i = 0, 1, …, (L-1)\),的多頭注意力,則首先計算\(q\)(H)、\(k_i\)(H)、\(v_i\)(H):

\[ q = x W_q^T + b_q \\ k_i = y_i W_k^T + b_k \\ v_i = y_i W_v^T + b_v \\ \]

其中,\(W_z\)\(H \times H\))和\(b_z\)\(H\))分別為權重矩陣和偏置向量,\(z \in \{ q, k, v \}\)
然後將這三種向量等長度拆分成\(S\)個向量,稱為頭向量:

\[ q_j = [q_0; q_1; …; q_{S-1}] \\ k_{ij} = [k_{i0}; k_{i1}; …; k_{i, S-1}] \\ v_{ij} = [v_{i0}; v_{i1}; …; v_{i, S-1}] \\ \]

上式中的分號為串連操作,即把多個向量拼接起來組成一個更長的向量。
其中,每個頭向量長度都為\(D\),且\(S \times D = H\)

然後計算\(q_j\)\(k_{ij}\)的注意力分數\(s_{ij}\)

\[s_{ij} = \frac{q_j k_{ij}^T}{\sqrt{D}} \]

之後可以添加註意力掩碼(也可以不加),即令\(s_{mj} = -\infty\)\(m\)是需要添加掩碼的位置。
然後通過softmax計算注意力概率\(p_{ij}\)

\[p_{ij} = \frac{exp(s_{ij})}{\sum_{t=0}^{L-1} exp(s_{tj})} \]

之後對注意力概率進行隨機失活:

\[\hat{p}_{ij} = dropout(p_{ij}) \]

再之後計算輸出向量\(r_j\)\(D\)):

\[r_j = \sum_{i=0}^{L-1} \hat{p}_{ij} v_{ij} \]

最終的輸出向量是把每一頭的輸出向量串連起來:

\[r = [r_0; r_1; …; r_{S-1}] \]

其中\(r\)\(H\))為最終的輸出向量。

如果令\(x = y_n\)\(n \in \{ 0, 1, …, L-1 \}\),即\(x\)\(y_i\)中的某一個向量,那麼多頭注意力就變為多頭自注意力。

代碼如下:

代碼

# BERT之多頭自注意力
class BertMultiHeadSelfAtt(nn.Module):
	def __init__(self, config):
		super().__init__()
		# 注意力頭數
		self.num_heads = config.num_attention_heads
		# 注意力頭向量長度
		self.head_size = config.hidden_size // config.num_attention_heads

		self.query = nn.Linear(config.hidden_size, config.hidden_size)
		self.key = nn.Linear(config.hidden_size, config.hidden_size)
		self.value = nn.Linear(config.hidden_size, config.hidden_size)

		self.dropout = nn.Dropout(config.attention_probs_dropout_prob)

	# 輸入(batch_size * seq_length * hidden_size)
	# 輸出(batch_size * num_heads * seq_length * head_size)
	def shape(self, x):
		shape = (*x.shape[:2], self.num_heads, self.head_size)
		return x.view(*shape).transpose(1, 2)
	# 輸入(batch_size * num_heads * seq_length * head_size)
	# 輸出(batch_size * seq_length * hidden_size)
	def unshape(self, x):
		x = x.transpose(1, 2).contiguous()
		return x.view(*x.shape[:2], -1)

	def forward(self,
			inputs,  # 輸入(batch_size * seq_length * hidden_size)
			att_masks=None,  # 注意力掩碼(batch_size * seq_length * hidden_size)
	):
		mixed_querys = self.query(inputs)
		mixed_keys = self.key(inputs)
		mixed_values = self.value(inputs)

		querys = self.shape(mixed_querys)
		keys = self.shape(mixed_keys)
		values = self.shape(mixed_values)

		# 注意力分數(batch_size * num_heads * seq_length * seq_length)
		att_scores = querys.matmul(keys.transpose(2, 3))
		# 縮放注意力分數
		att_scores = att_scores / sqrt(self.head_size)
		# 添加註意力掩碼
		if att_masks is not None:
			att_scores = att_scores + att_masks

		# 注意力概率(batch_size * num_heads * seq_length * seq_length)
		att_probs = att_scores.softmax(dim=-1)
		# 隨機失活注意力概率
		att_probs = self.dropout(att_probs)

		# 輸出(batch_size * num_heads * seq_length * head_size)
		outputs = att_probs.matmul(values)
		outputs = self.unshape(outputs)
		return outputs  # 輸出(batch_size * seq_length * hidden_size)

其中,
num_attention_heads是注意力頭數,默認是12(bert-base-*)或16(bert-large-*);
attention_probs_dropout_prob是注意力概率的隨機失活概率,默認是0.1。

1.3.1.4、跳躍連接

跳躍連接也是DL領域近年來最重要的創新之一!
跳躍連接也叫殘差連接(residual connection)。
一般來說,傳統的神經網絡往往是一層接一層串連而成,前一層輸出作為後一層輸入。
而跳躍連接則是某一層的輸出,跳過若干層,直接輸入某個更深的層。
例如BERT的每個隱藏層中有兩個跳躍連接。

跳躍連接的作用是防止神經網絡梯度消失或梯度爆炸,使損失曲面(loss surface)更平滑,從而使模型更容易訓練,使神經網絡可以設置得更深。

按我個人的理解,一般來說,線性變換是最能保持輸入信息的,而非線性變換則往往會損失一部分信息,但是為了網絡的表示能力不得不線性變換與非線性變換多次堆疊,這樣網絡深層接收到的信息與最初輸入的信息比可能已經面目全非,而跳躍連接則可以讓輸入信息原汁原味地傳播得更深。

隱藏層代碼如下:

代碼

# BERT之隱藏層
class BertLayer(nn.Module):
	# noinspection PyUnresolvedReferences
	def __init__(self, config):
		super().__init__()
		# 多頭自注意力
		self.multi_head_self_att = BertMultiHeadSelfAtt(config)

		self.linear = nn.Linear(config.hidden_size, config.hidden_size)
		self.dropout = nn.Dropout(config.hidden_dropout_prob)
		self.layer_norm = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)

		# 升維線性變換
		self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
		# 激活函數,默認:GELU
		self.act_fct = F.gelu

		# 降維線性變換,使向量大小保持不變
		self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
		self.dropout_1 = nn.Dropout(config.hidden_dropout_prob)
		self.layer_norm_1 = nn.LayerNorm(config.hidden_size, eps=config.layer_norm_eps)
	def forward(self,
			inputs,  # 輸入(batch_size * seq_length * hidden_size)
			att_masks=None,  # 注意力掩碼(batch_size * seq_length * hidden_size)
	):
		outputs = self.multi_head_self_att(inputs, att_masks=att_masks)
		outputs = self.linear(outputs)
		outputs = self.dropout(outputs)
		att_outputs = self.layer_norm(outputs + inputs)  # 跳躍連接

		outputs = self.linear_1(att_outputs)
		outputs = self.act_fct(outputs)

		outputs = self.linear_2(outputs)
		outputs = self.dropout_1(outputs)
		outputs = self.layer_norm_1(outputs + att_outputs)  # 跳躍連接
		return outputs  # 輸出(batch_size * seq_length * hidden_size)

其中,
intermediate_size是中間一個升維線性變換升維后的長度,默認是3072(bert-base-*)或4096(bert-large-*)。

編碼器代碼如下:

代碼

# BERT之編碼器
class BertEnc(nn.Module):
	def __init__(self, config):
		super().__init__()
		# num_hidden_layers個隱藏層
		self.layers = nn.ModuleList([BertLayer(config)
			for _ in range(config.num_hidden_layers)])
	# noinspection PyTypeChecker
	def forward(self,
			inputs,  # 輸入(batch_size * seq_length * hidden_size)
			att_masks=None,  # 注意力掩碼(batch_size * seq_length)
	):
		# 調整注意力掩碼的值和形狀
		if att_masks is not None:
			device = inputs.device  # 設備(CPU或CUDA)
			dtype = inputs.dtype  # 數據類型(float16、float32或float64)
			shape = att_masks.shape  # 形狀(batch_size * seq_length)
			t = tc.zeros(shape, dtype=dtype, device=device)
			t[att_masks<=0] = -inf  # exp(-inf) = 0
			t = t[:, None, None, :]
			att_masks = t

		outputs = inputs
		for layer in self.layers:
			outputs = layer(outputs, att_masks=att_masks)
		return outputs  # 輸出(batch_size * seq_length * hidden_size)

其中,
num_hidden_layers是隱藏層數量,默認是12(bert-base-*)或24(bert-large-*)。

1.4、池化層

池化層是將[CLS]標記對應的表示取出來,並做一定的變換,作為整個序列的表示並返回,以及原封不動地返回所有的標記表示。
如圖:

其中,激活函數默認是tanh。

池化層代碼如下:

代碼

# BERT之池化層
class BertPool(nn.Module):
	def __init__(self, config):
		super().__init__()
		self.linear = nn.Linear(config.hidden_size, config.hidden_size)
		self.act_fct = F.tanh
	def forward(self,
			inputs,  # 輸入(batch_size * seq_length * hidden_size)
	):
		# 取[CLS]標記的表示
		outputs = inputs[:, 0]
		outputs = self.linear(outputs)
		outputs = self.act_fct(outputs)
		return outputs  # 輸出(batch_size * hidden_size)

1.5、輸出

主模型最後輸出所有的標記表示和整體的序列表示,分別用於針對每個標記的預測任務和針對整個序列的預測任務。

主模型代碼如下:

代碼

# BERT之預訓練模型抽象基類
class BertPreTrainedModel(PreTrainedModel):
	from transformers import BertConfig
	from transformers import BERT_PRETRAINED_MODEL_ARCHIVE_MAP
	from transformers import load_tf_weights_in_bert

	config_class = BertConfig
	pretrained_model_archive_map = BERT_PRETRAINED_MODEL_ARCHIVE_MAP
	load_tf_weights = load_tf_weights_in_bert
	base_model_prefix = 'bert'

	# 注意力頭剪枝
	def _prune_heads(self, heads_to_prune):
		pass
	# 參數初始化
	def _init_weights(self, module):
		config = self.config
		f = lambda x: x is not None and x.requires_grad
		if isinstance(module, nn.Embedding):
			if f(module.weight):
				# 正態分佈隨機初始化
				module.weight.data.normal_(mean=0.0, std=config.initializer_range)
		elif isinstance(module, nn.Linear):
			if f(module.weight):
				# 正態分佈隨機初始化
				module.weight.data.normal_(mean=0.0, std=config.initializer_range)
			if f(module.bias):
				# 初始為0
				module.bias.data.zero_()
		elif isinstance(module, nn.LayerNorm):
			if f(module.weight):
				# 初始為1
				module.weight.data.fill_(1.0)
			if f(module.bias):
				# 初始為0
				module.bias.data.zero_()
# BERT之主模型
class BertModel(BertPreTrainedModel):
	def __init__(self, config):
		super().__init__(config)
		self.config = config
		# 嵌入層
		self.emb = BertEmb(config)
		# 編碼器
		self.enc = BertEnc(config)
		# 池化層
		self.pool = BertPool(config)
		# 參數初始化
		self.init_weights()

	# noinspection PyUnresolvedReferences
	def get_input_embeddings(self):
		return self.emb.tok_emb
	def set_input_embeddings(self, embs):
		self.emb.tok_emb = embs

	def forward(self,
			tok_ids,  # 標記編碼(batch_size * seq_length)
			pos_ids=None,  # 位置編碼(batch_size * seq_length)
			sent_pos_ids=None,  # 句子位置編碼(batch_size * seq_length)
			att_masks=None,  # 注意力掩碼(batch_size * seq_length)
	):
		outputs = self.emb(tok_ids, pos_ids=pos_ids, sent_pos_ids=sent_pos_ids)
		outputs = self.enc(outputs, att_masks=att_masks)
		pooled_outputs = self.pool(outputs)
		return (
			outputs,  # 輸出(batch_size * seq_length * hidden_size)
			pooled_outputs,  # 池化輸出(batch_size * hidden_size)
		)

其中,
BertPreTrainedModel是預訓練模型抽象基類,用於完成一些初始化工作。

後記

本文詳細地介紹了BERT主模型的結構及其組件,了解它的構造以及代碼實現對於理解以及應用BERT有非常大的幫助。
後續兩篇文章會分別介紹BERT預訓練下游任務相關。

從BERT主模型的結構中,我們可以發現,BERT拋棄了RNN架構,而只用注意力機制來抽取長距離依賴(這個其實是Transformer架構的特點)。
由於注意力可以并行計算,而RNN必須串行計算,這就使得模型計算效率大大提升,於是BERT這類模型也能夠堆得很深。
BERT為了能夠同時做單句和雙句的序列和標記的預測任務,設計了[CLS][SEP]等特殊標記分別作為序列表示以及標記不同的句子邊界,整體採用了桶狀的模型結構,即輸入時隱狀態的形狀與輸出時隱狀態的形狀相等(只是在每個隱藏層有升維與降維操作,整體上詞嵌入長度保持不變)。
由於注意力機制對距離不敏感,所以BERT額外添加了位置特徵。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

FB行銷專家,教你從零開始的技巧

Uber等即時叫車碳排更高 首份量化研究:車隊應電動化、提升共乘比例

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化