PHP安全之道學習筆記2:編碼安全指南

編碼安全指南

編程本身就應該是一門藝術,而安全編程更是一種在刀尖上舞蹈的藝術,不僅要小心腳下的鋒利寒刃,更要小心來自網絡黑客或攻擊者的狂轟亂炸。
– by code artist

  • 1.hash比較的缺陷
    經過試驗發現,當Hash值以”0e”開頭且後面都為数字,當和数字進行比較的時候總會被判斷和0相等

例如:

var_dump(‘0e1327544’ == 0); // bool(true)

當密碼被md5計算后,可能會以”0e”開頭,下面這個例子可以繞過密碼驗證。
經過我的驗證PHP 7.1.x后沒有這種問題。

<?php
    $password_from_db = "0e23434";
    $password = "2323"; // 隨意的一個密碼。來自$_POST,即表單提交
    if ($password_from_db == md5($password)) {
        echo "login success!";
    } else {
        echo "login fails";
    }

更安全的hash比較:
可以使用內置函數hash_equals()來比較hash值。(PHP版本必須是5.6及其以上)

 if (hash_equals($password_from_db, md5($password)) {
     .....// other logic
 }
  • 2.bool比較的缺陷

json_decode和unserialize函數可能將部分結構解析成bool值,造成一些比較上的缺陷。

先舉例json_decode的案例:

<?php
$str = '{"user":true, "pass": true}';

$data = json_decode($str, true);

if ($data['user'] == 'root' && $data['pass'] == 'pass') {
    echo "login success\r\n";
} else {
    echo "login fails\r\n";
}

執行結果為:login success
這樣利用bool比較的漏洞就繞過了登錄或者授權驗證。

unserialize過程相逆,結果類似,也會出現安全問題。

正確的做法還是使用”===”來進行比較,這不光是php,包括一些其他腳本語言或者靜態語言,都請嚴謹地使用全等於符號進行比較。

  • 3.數值比較

PHP雖然是弱類型語言,但是數據類型也有數值範圍。對於整型而言,最大值為PHP_INT_MAX(即9223372036854775807)
攻擊者可以利用最大值越界,繞過一些驗證,如登錄、賬號充值等等。

舉例:

$a = 9223372036854775807;
$b = 9223372036854775827;
var_dump($a === $b); // bool(true)
var_dump($a % 100); // int(0)

由此,可見全等號(===)也不是萬能的,具體場景下要更小心。經驗證,PHP7.1.x后不會出現該問題,5.x的可能出現。

在實際業務邏輯裏面一定要注意判斷最大值問題,避免越界帶來的問題。

當使用超長浮點數變量的時候,PHP也會出錯。

<?php
$uid = 0.999999999999999999;
if ($uid == "1") {
    echo "search uid is 1 for data\r\n"; // 這裏PHP將$uid約等於1了,進入該判斷條件里的邏輯
}

同理,2.999999999999也會被當成3,這就是超越浮點數精度造成的偏差。

解決辦法有很多,最簡單的就是用is_int()函數進行判斷,如果不是整型,則報錯或做錯誤處理。

  • 4.switch缺陷

當用case判斷数字的時候,switch會把參數轉換成int類型進行計算,代碼如下:

<?php
$num = "1FreePHP";
switch ($num) {
    case 0: echo "nothing";
        break;
    case 1: echo "1 hacker here!";
        break;
    case 2: echo "2 hackers here";
        break;
    default:
        echo "confused";
}

最後輸出:1 hacker here!

所以,請使用is_numeric()函數進行判斷,保證數據類型如預期的一致。

  • 5.數組缺陷。
    in_array()和array_search()函數在沒有使用嚴格模式的情況下會用鬆散比較,可能造成一些錯誤。
    例如:
<?php
$arr = [0, 2, 3, "4"];
var_dump(in_array('freephp', $arr)); // true
var_dump(array_search('freephp', $arr)); // 0: 下標
var_dump(in_array('2freephp', $arr)); // true
var_dump(array_search('3freephp', $arr)); // 2: 下標

總的來說,PHP工程師對於這種弱類型語言的使用上要更加小心,雖然平時寫起業務來“短平快”,但安全編程也不要忘記,能用上hint的高版本PHP就進行標註清楚入參、出參,讓PHP代碼更加健壯。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

平板收購,iphone手機收購,二手筆電回收,二手iphone收購-全台皆可收購

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

HTTP基礎及telnet簡單命令

一、HTTP概況

 

  20世紀90年代初期,一個主要的新興應用即萬維網(World Wide Web)登上了舞台。Web是一個引起公眾注意的因特網應用。Web的應用層協議是超文本傳輸協議(HTTP),它是Web的核心。HTTP由兩個程序實現:一個客戶程序和一個服務器程序。客戶程序和服務器程序運行在不同的端系統中,通過交換HTTP報文進行會話。HTTP會話定義了這些報文的結構以及客戶和服務器進行報文交換的方式。

  Web頁面(也叫文檔)是由對象組成的。一個對象只是一個文件,諸如一個HTML文件、一個JPEG圖形、一個Java小程序或一個視頻片段這樣的文件,且他們可通過一個URL地址尋址。多數Web頁面含有一個HTML基本文件以及幾個引用對象。例如,如果一個Web頁面包含HTML基本文件和5個JPEG圖形,那麼這個Web頁面6個對象:一個HTML基本文件加5個圖形。HTML基本文件通過對象的URL地址引用頁面中的其他對象。每個URL地址由兩部分組成:存放對象的服務器主機名和對象的路徑名。Web瀏覽器實現了HTTP的客戶端,Web服務器實現了HTTP的服務器端,它用於存儲Web對象,每個對象由URL尋址。

  HTTP定義了Web客戶向Web服務器請求Web頁面的方式,以及服務器向客戶傳送Web頁面的方式,其基本思想就是當用戶請求一個Web頁面(如點擊一個超鏈接)時,瀏覽器向服務器發出對該頁面中所包含對象的HTTP請求報文,服務器接收到請求並用包含這些對象的HTTP響應報文進行響應。

  HTTP使用TCP作為它的支撐運輸協議(而不是在UDP上運行)。HTTP客戶首先發起一個與服務器的TCP連接。一旦連接建立,該瀏覽器和服務器進程就可以通過套接字接口訪問TCP。客戶向它的套接字接口發送HTTP請求報文並從它的套接字接口接收HTTP響應報文。類似的,服務器從它的套接字接口接收HTTP請求報文和向它的套接字接口發送HTTP響應報文。一旦客戶向他的套接字接口發送了一個請求報文,該報文就脫離了客戶控制並進入TCP的控制。TCP為HTTP提供可靠數據傳輸服務。這意味着,一個客戶進程發出的每個HTTP請求報文最終能完整地到達服務器;類似的,服務器進程發出的每個HTTP響應報文最終能完整地到達客戶。

  注意到下列現象很重要:服務器向客戶發送被請求的文件,而不存儲任何關於該客戶的狀態信息。假如某個特定的客戶在短短的幾秒鐘內兩次請求同一個對象,服務器並不會因為剛剛為該客戶提供了該對象就不再做出反應,而是重新發送該對象,就像服務器已經完全忘記不久之前所做過的事一樣。因為HTTP服務器並不保存關於客戶的任何信息,所以我們說HTTP是一個無狀態協議

 

二、非持續連接和持續連接

 

  在許多因特網應用程序中,客戶和服務器在一個相當長的時間範圍內通信,其中客戶發出一系列請求並且服務器對每個請求進行響應。依據應用程序以及該應用程序的使用方式,這一系列請求可以以規則的間隔周期性的或者間斷性的一個接一個發出。當這種客戶-服務器的交互是經TCP進行的,應用程序的研製者就要做一個重要決定,即每個請求/響應對是經一個單獨的TCP連接發送,還是所有的請求及其相應經相同的TCP連接發送呢?採用前一種方法,該應用程序被稱為使用非持續連接;採用后一種方法,該應用程序被稱為使用持續連接。如HTTP既能夠使用非持續連接,也能夠使用持續連接。儘管HTTP在默認方式下使用持續連接,HTTP客戶和服務器也能配置成非持續連接。

1.採用非持續連接的HTTP

  我們看看在非持續連接情況下,從服務器向客戶傳送一個Web頁面的步驟。假設該頁面含有一個HTML基本文件和10個JPEG圖形,並且這11個對象位於同一台服務器上。該HTML文件的URL為:我們看看發生了什麼情況:

  • HTTP客戶進程在端口號80發起一個到服務器的TCP連接,該端口號是HTTP的默認端口。在客戶和服務器上分別有一個套接字與該連接相關聯。

  • HTTP客戶經它的套接字向該服務器發送一個HTTP請求報文。請求報文中包含了路徑名/someDepartment/home.index。

  • HTTP服務器進程經它的套接字接收該請求報文,從其存儲器(RAM或磁盤)中檢索出對象,在一個HTTP響應報文中封裝對象,並通過其套接字向客戶發送響應報文。

  • HTTP服務器進程通知TCP斷開該TCP連接。(但是直到TCP確認客戶已經完整的收到響應報文為止,它才會實際中斷連接。

  • HTTP客戶接收響應報文,TCP連接關閉。該報文指出封裝的對象是一個HTML文件,客戶從響應報文中提取出該文件,檢查該HTML文件,得到對10個JPEG圖形的引用。

  • 對每個引用的JPEG圖形對象重複前4個步驟。

  上面的步驟舉例說明了非持續連接的使用,其中每個TCP連接在服務器發送一個對象后關閉,即該連接並不為其他的對象而持續下來。值得注意的是每個TCP來接只傳輸一個請求報文和響應報文。

     在上面描述的步驟中,我們有意沒有明確客戶獲得這10個JPEG圖形對象是使用10個串行的TCP連接,還是某些JPEG對象使用了一些并行的TCP連接。事實上,用戶能配置現代瀏覽器以控制并行度。在默認方式下,大部分瀏覽器打開5~10個并行的TCP連接,而每條連接處理一個請求響應事務。如果用戶願意,最大并行連接數可以設置為1,這樣10條連接就會串行建立。

  我們來簡單估算一下從客戶請求HTML基本文件起到該客戶收到整個文件止所花費的時間。為此,我們給出往返時間(Round-Trip Time,RTT)的定義,該時間是指一個短分組從客戶到服務器然後再返回客戶所花費的時間。RTT包括分組傳播時延、分組在中間路由器和交換機上的排隊時延以及分組處理時延。現在考慮當用戶點擊超鏈接時會發生什麼現象。如圖2-7所示,這引起瀏覽器在它和Web服務器之間發起一個TCP連接;這涉及一次“三次握手”過程。即客戶向服務器發送一個小TCP報文段,服務器用一個小TCP報文段做出確認和響應,最後,客戶向服務器返回確認。三次握手中前兩個部分所耗費的時間佔用了一個RTT。完成了三次握手的前兩個部分后,客戶結合三次握手的第三部分(確認)向該TCP連接發送一個HTTP請求報文。一旦該請求報文到達服務器,服務器就在該TCP連接上發送HTML文件。該HTTP請求/響應用去了另一個RTT。因此,粗略地將,總的響應時間就是兩個RTT加上服務器傳輸HTML文件的時間。

2.採用持續連接的HTTP

  非持續連接有一些缺點。首先,必須為每一個請求的對象建立和維護一個全新的連接。對於每個這樣的連接,在客戶和服務器中都要分配TCP的緩衝區和保持TCP變量,這給Web服務器帶來了嚴重的負擔,因為一台Web服務器可能同時服務於數以百計不同的客戶的請求。第二,就像我們剛描述的那樣,每一個對象經受兩倍RTT的交付時延,即一個RTT用於創建TCP,另一個RTT用於請求和接收一個對象。

  在採用持續連接的情況下,服務器在發送響應后保持該TCP連接打開。在相同的客戶與服務器之間的後續請求和響應報文能夠通過相同的連接進行傳送。特別是,一個完整的Web頁面(上例中的HTML基本文件加上10個圖形)可以用單個持續TCP連接進行傳送。更有甚者,位於同一台服務器的多個Web頁面在從該服務器發送給同一個客戶時,可以在單個持續TCP連接上進行。可以一個接一個地發出對對象的這些請求,而不必等待對未決請求(流水線)的回答。一般來說,如果一條連接經過一定的時間間隔(一個可配置的超時間隔)仍未被使用,HTTP服務器就關閉該連接。HTTP的默認模式是使用帶流水線的持續連接。

三、HTTP報文格式

  HTTP報文有兩種:請求報文和響應報文。

1.HTTP請求報文

  下面提供了一個典型的HTTP請求報文:

GET /somedir/page.html HTTP/1.1

Host:

Connection: close

User-agent: Mozilla/5.0

Accept-language: fr

  通過仔細觀察這個簡單的請求報文,我們就能知道很多東西。首先,我們看到該報文是用普通的ASCII文本書寫的,我們看到該報文由5行組成,每行由一個回車和換行符結束。最後一行后再附加一個回車換行符。一個請求報文能夠具有更多的行或者至少為一行。請求行的方法字段可以取幾種不同的值,包括GET、POST、HEAD、PUT和DELETE。當瀏覽器請求一個對象時,使用GET方法,在URL字段帶有請求對象的標識,在本例中,該瀏覽器正在請求對象/somedir/page.html。其版本字段是自解釋的;在本例中,瀏覽器實現的是HTTP/1.1版本。現在我們看看本例的首部行。首部行Host: 指明了對象所在的主機。你也許認為該首部行是不必要的,因為在該主機中已經有一條TCP連接存在了,但是,該首部行提供的信息是Web代理高速緩存所要求的。通過包含Connection: close首部行,該瀏覽器告訴服務器不希望麻煩地使用持續連接,它要求服務器在發送完被請求的對象后就關閉這條連接。User-agent: 首部行用來指明用戶代理,即向服務器發送請求的瀏覽器類型。這裏瀏覽器類型是Mozilla/5.0,即Firefox瀏覽器。這個首部行是有用的,因為服務器可以有效地為不同類型的用戶代理實際發送相同對象的不同版本。(每個版本都由相同的URL尋址。)最後,Accept-language: 首部行表示用戶想得到該對象的法語版本。如果服務器中沒有這樣的對象的話,服務器應當發送它的默認版本。

  接下來看看如圖2-8所示的一個請求報文的通用格式。你可能注意到了在首部行(和附加的回車和換行)後有一個“實體主體”。使用GET方法是實體主體為空,而使用POST方法時才使用該實體主體。當用戶提交表單時,HTTP客戶常常使用POST方法,例如當用戶向搜索引擎提供搜索關鍵詞時。使用POST報文時,用戶仍可以向服務器請求一個Web頁面,但Web頁面的特定內容依賴於用戶在表單字段中輸入的內容。如果方法字段的值為POST時,則實體主體中包含的就是用戶在表單字段中的輸入值。

  當然,如果不提“用表單生成的請求報文不是必須使用POST方法”這一點,那將是失職。HTML表單經常使用GET方法,並在(表單字段中)所請求的URL中包括輸入的數據。例如,一個表單使用GET方法,它有兩個字段,分別填寫的是“monkeys”和“bananas”,這樣,該URL結構為? monkeys&bananas。

  HEAD方法類似GET方法。當服務器收到使用HEAD方法的請求時,將會用一個HTTP報文進行響應,但是並不返回請求對象。應用程序開發者常用HEAD方法進行調試跟蹤。PUT方法常與Web發行工具聯合使用,它允許用戶上傳對象到指定的Web服務器上指定的路徑(目錄)。PUT也被那些需要向Web服務器上傳對象的應用程序使用。DELETE方法允許用戶或者應用程序刪除Web服務器上的對象。

2.HTTP響應報文

  下面我們提供了一條典型的HTTP響應報文。該響應報文可以是對剛剛討論的例子中請求報文的響應。

HTTP/1.1 200 OK

Connection: close

Date: Tue, 09 Aug 2011 15:44:04 GMT

Server: Apache/2.2.3 (CentOS)

Last-Modified: Tue, 09 Aug 2011 15:11:03 GMT

Content-Length: 6821

Content-Type: text/html

(data data data data data …)

  我們仔細看這個響應報文。實體主體部分是報文的主要部分,即它包含了所請求的對象本身(表示為data data data data data …)。我們現在來看看首部行。服務器用Connection:close首部行告訴客戶,發送完報文後將關閉該TCP連接。Date:首部行指示服務器產生併發送該響應報文的日期和時間。值得一提的是,這個時間不是指對象創建或者最後修改的時間;而是服務器從它的文件系統中檢索到該對象,插入到響應報文,併發送響應報文的時間。Server:首部行指示該報文是由一台Apache Web服務器產生的,它類似於HTTP請求報文中的User-agent:首部行,Last-Modified:首部行指示了對象創建或者最後修改的日期和時間。Last-Modified:首部行對極可能在本地客戶也可能在網絡緩存服務器(代理服務器)上的對象緩存來說非常重要。Content-Length:首部行知識了被發送對象中的字節數。Content-Type:首部行指示了實體主體中的對象是HTML文本。(該對象類型應該正式地由Content-Type:首部行而不是用文件擴展名來指示。)

  看過一個例子后,我們再來查看響應報文的通用格式(如圖2-9所示)。我們補充說明一下狀態碼和它們對應的短語。狀態碼及其相應的短語指示了請求的結果。一些常見的狀態碼和相關的短語包括:

  • 200 OK:請求成功,信息在返回的響應報文中。

  • 301 Moved Permanently:請求的對象已經被永久轉移了,新的URL定義在響應報文的Location:首部行中。**客戶軟件將自動獲取新的URL。

  • 400 Bad Request:一個通用差錯代碼,指示該請求不能被服務器理解。

  • 404 Not Found:被請求的文檔不在服務器上。

  • 505 HTTP Version Not Supported:服務器不支持請求報文使用的HTTP協議版本。

  你想看一下真正的HTTP響應報文嗎?很容易做到。首先用Telnet登錄到你喜歡的Web服務器上,接下來輸入一個只有一行的請求報文去請求放在該服務器上的某些對象。

  在linux終端輸入完telnet 80后,會是下面這種情況:

  然後按下ctrl + ]呼出telnet命令行出現下面這種情況:

  先按下回車鍵,再輸入HTTP請求,最終得到HTTP響應如下:

  在telnet命令行上輸入quit退出telnet,如下圖:

 

 

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品在網路上成為最夯、最多人討論的話題?

※高價收購3C產品,價格不怕你比較

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

軟件開發要質量還是要效率?

質量和效率似乎永遠都是一對冤家,儘管我們都希望既有質量,又有效率。

把“質量”當做宗旨的企業,通常都有一系列的規章制度,甚至是繁重且冗餘的流程用來約束軟件開發過程中種種“有意”或“無意”的威脅軟件質量的行為。

把“效率”當做宗旨的企業,通常其內部並無嚴格的規章制度,甚至寬鬆到一個人都可以輕鬆地完成從刪庫到跑路。

從事IT行業的相關人員大多知道,軟件開發不同於一般性的勞動,它並不能單純地增加人手就能縮減開發周期,也就是說一個軟件1個人開發需要10天,這並不意味着10個人就可以1天開發完成。並且在軟件開發的過程中,由於需要“適應市場的快速發展”,常常伴隨需求變更等不可預知的問題。也就是在前期所做的工作可能因為某個需求而全部推倒重來。

下面從要質量還是要效率兩個方面來闡述,不同的側重點所帶來的的問題。

我們首先假設,公理P1:作為IT行業的從業者(開發、測試、產品等)都知道,軟件開發具有一定的不可預知性

那麼在這個前提下,傾向於“質量”的企業通常情況下有以下做法:

  • 通過規章制度讓軟件開發具有一定的可預知性

讓軟件開發具有一定的可預知性,這種方式有很多種實現,比較常見的手段是讓需求變更的成本上升。一旦進入開發階段(含設計階段),需求不得隨意變更,這種方式對開發人員相對比較友好,開發人員不再被隨意變更的需求所打擾,但同時也對產品經理提出了更多的要求。這要求產品經理需要有高超的業務能力,以及一定的前瞻性。除了讓需求變更的成本上升以外,通常也會在前期做大量的工作,包括需求評審、文檔設計、設計評審等會議,在軟件開發的中後期不斷地進行代碼評審等工作。這一系列的規章制度流程,能使得軟件開發不再隨心所欲,而是有章可循。顯而易見,這樣“傳統”的開發形式,勢必帶來效率的下降。例如我曾經見過有的公司,一年最多發布2個版本。這在如今快速的互聯網發展中是不可接受的。

而傾向於“效率”的企業,也就是通常所說的互聯網公司對於效率的提升通常採取以下手段:

  • 通過縮短開發周期使軟件開發具有一定的可預知性

目前在部分互聯網公司所倡導的“敏捷開發”實際上就是通過縮短開發周期來使軟件具有一定的可預知性。我們在開頭假設了了公理P1,軟件開發具有一定的不可預知性。並且開發周期越長,不可預知性越大。注重質量的公司,可能更傾向於提高需求變更的成本,而注重效率的公司則縮短開發周期。兩者都是為了使得軟件開發變得可控。但兩個不同的方式則導致了兩個不同的傾向。

縮短開發周期的確會讓效率變得更高,起碼能更快的適應市場的需求。那為什麼會說縮短開發周期會使得質量降低呢?

其實這是一個顯而易見的道理,縮短開發周期,理論上來講似乎就能縮短開發時間。10個需求需要做10天,平均1個需求不就只需要1天嗎?那麼我為了提高我的效率,快速響應市場變化,我就採取敏捷開發的方式,這樣不就既滿足了效率,同時也滿足了開發時間,這樣的做法似乎並不會降低軟件開發的質量。這麼想的通常是沒有從事過技術研發的同學。仍然回到公理P1,軟件開發具有一定的不可預知性。我在做當前開發的時候,所採取的的設計基本上只適用於當前的業務模型,對於未來幾乎一無所知。隨着系統不斷地快速迭代,一次又一次的在原有的系統上疊加新的功能修改刪除舊的功能。這對於軟件開發者可以說是災難性的,沒有哪一個系統架構師能遇見未來的所有可能。“天下武功唯快不破”,快是快了,代碼後院也快起火了。

天底下沒有公司敢說我不注重質量,我只注重效率。無論是什麼公司都會採取以下手段去保證軟件質量。

  • 通過一定的經濟利益懲罰手段

一定的懲罰手段,簡單粗暴地將開發人員的bug數與績效掛鈎。不過直接將bug數與績效掛鈎的情況比較少,大多情況是bug的reopen次數,以及是否有新引入的bug。其中reopen是較為常見的一種懲罰手段,同樣也能較好地推動軟件質量提升。

事實上,並沒有哪一種絕對完美的兼顧了質量和效率,對於目前的互聯網公司大多所採用的是快速迭代的開發方式。但這並不代表採用這種方式的公司質量就一定低下。

“快速適應市場的變化”這本身也是一種需求,採取快速迭代的方式實際上也是為了滿足這一“需求”。阿里巴巴集團CTO行癲曾談到過,“最早,業務比技術跑的快,技術一直追業務,因為業務增長實在太快了。前兩年我覺得是技術推動業務,特別是人工智能興起的之後,包括我們程序化交易、廣告平台、千人千面、推薦、搜索大量用算法和AI,包括客服等等大量用數據智能在驅動業務”。

“業務比技術跑得快”,這意味着一定一個快速迭代的過程。而後來“技術推動業務”,意味着技術走在了業務的前面,反倒是技術追着業務打。這其中儘管並未提及質量,但我認為技術能推動業務不斷向前跑,一定是因為有堅實的技術後盾做支撐,而堅實的技術後盾也就意味着有超高的軟件質量

所以,在質量與效率的權衡利弊平衡中,不妨回過頭來重新審視技術的重要性。在滿足“市場快速變化”這一需求的同時,不要忘記技術也會負債,欠得越多越不牢靠。

這是一個能給程序員加buff的公眾號 (CoderBuff)

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※高價收購3C產品,價格不怕你比較

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

3c收購,鏡頭 收購有可能以全新價回收嗎?

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

HBase 基本入門篇

目錄

無論是 NoSQL,還是大數據領域,HBase 都是非常”炙熱”的一門數據庫。
本文將對 HBase 做一些基礎性的介紹,旨在入門。

一、簡介

HBase 是一個開源的、面向列的非關係型分佈式數據庫,目前是Hadoop體系中非常關鍵的一部分。
在最初,HBase是基於谷歌的 BigTable 原型實現的,許多技術來自於Fay Chang在2006年所撰寫的Google論文”BigTable”。與 BigTable基於Google文件系統(File System)一樣,HBase則是基於HDFS(Hadoop的分佈式文件系統)之上而開發的。

HBase 採用 Java 語言實現,在其內部實現了BigTable論文提到的一些壓縮算法、內存操作和布隆過濾器等,這些能力使得HBase 在海量數據存儲、高性能讀寫場景中得到了大量應用,如 Facebook 在 2010年11 月開始便一直選用 HBase來作為消息平台的存儲層技術。
HBase 以 Apache License Version 2.0開源,這是一種對商業應用友好的協議,同時該項目當前也是Apache軟件基金會的頂級項目之一。

有什麼特性

  • 基於列式存儲模型,對於數據實現了高度壓縮,節省存儲成本
  • 採用 LSM 機制而不是B(+)樹,這使得HBase非常適合海量數據實時寫入的場景
  • 高可靠,一個數據會包含多個副本(默認是3副本),這得益於HDFS的複製能力,由RegionServer提供自動故障轉移的功能
  • 高擴展,支持分片擴展能力(基於Region),可實現自動、數據均衡
  • 強一致性讀寫,數據的讀寫都針對主Region上進行,屬於CP型的系統
  • 易操作,HBase提供了Java API、RestAPI/Thrift API等接口
  • 查詢優化,採用Block Cache 和 布隆過濾器來支持海量數據的快速查找

與RDBMS的區別

對於傳統 RDBMS 來說,支持 ACID 事務是數據庫的基本能力,而 HBase 則使用行級鎖來保證寫操作的原子性,但是不支持多行寫操作的事務性,這主要是從靈活性和擴展性上做出的權衡。

ACID 要素包含 原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)以及持久性(Durability)

總體來說, HBase 與傳統關係數據庫的區別,如下錶所示:

特性 HBase RDBMS
硬件架構 類似於 Hadoop 的分佈式集群,硬件成本低廉 傳統的多核系統,硬件成本昂貴
容錯性 由軟件架構實現,由於由多個節點組成,所以不擔心一點或幾點宕機 一般需要額外硬件設備實現 HA 機制
數據庫大小 PB GB、TB
數據排布方式 稀疏的、分佈的多維的 Map 以行和列組織
數據類型 Bytes 豐富的數據類型
事物支持 ACID 只支持單個 Row 級別 全面的 ACID 支持,對 Row 和表
查詢語言 只支持 Java API (除非與其他框架一起使用,如 Phoenix、Hive) SQL
索引 只支持 Row-key,除非與其他技術一起應用,如 Phoenix、Hive 支持
吞吐量 百萬查詢/每秒 數千查詢/每秒

二、數據模型

下面,我們以關係型數據庫的一個數據表來演示 HBase 的不同之處。 先來看下面這張表:

ID 設備名 狀態 時間戳
1 空調 打開 20190712 10:05:01
2 電視機 關閉 20190712 10:05:08

這裏記錄的是一些家庭設備上報的狀態數據(DeviceState),其中包括設備名、狀態、時間戳這些字段。

在 HBase 中,數據是按照列族(Column Family,簡稱CF)來存儲的,也就是說對於不同的列會被分開存儲到不同的文件。
那麼對於上面的狀態數據表來說,在HBase中會被存儲為兩份:

列族1. 設備名

Row-Key CF:Column-Key Timestamp Cell Value
1 DeviceState:設備名 20190712 10:05:01 空調
2 DeviceState:設備名 20190712 10:05:08 電視機

列族2. 狀態

Row-Key CF:Column-Key Timestamp Cell Value
1 DeviceState:狀態 20190712 10:05:01 打開
2 DeviceState:狀態 20190712 10:05:08 關閉

這裏Row-key是唯一定位數據行的ID字段,而Row-key 加上 CF、Column-Key,再加上一個時間戳才可以定位到一個單元格數據。
其中時間戳用來表示數據行的版本, 在HBase中默認會有 3 個時間戳的版本數據,這意味着對同一條數據(同一個Rowkey關聯的數據)進行寫入時,最多可以保存3個版本。

在查詢某一行的數據時,HBase需要同時從兩個列族(文件)中進行查找,最終將結果合併后返回給客戶端。 由此可見如果列族太多,則會影響讀取的性能,在設計時就需要做一些權衡。

由此可見,HBase的使用方式與關係型數據庫是大不相同的,在使用 HBase 時需要拋棄許多關係型數據庫的思維及做法,比如強類型、二級索引、表連接、觸發器等等。

然而 HBase 的靈活性及高度可伸縮性卻是傳統 RDBMS 無法比擬的。

三、安裝HBase

單機環境安裝

  1. 準備JDK環境

確保環境上JDK已經裝好,可執行java -version確認:

host:/home/hbase # java -version
openjdk version "1.8.0_201"
OpenJDK Runtime Environment (build 1.8.0_201-Huawei_JDK_V100R001C00SPC060B003-b10)
OpenJDK 64-Bit Server VM (build 25.201-b10, mixed mode)
  1. 下載軟件

官網的下載地址頁面:

選擇合適的版本,比如1.4.10。 下載后解壓:

wget http://archive.apache.org/dist/hbase/2.1.5/hbase-2.1.5-bin.tar.gz
tar -xzvf hbase-2.1.5-bin.tar.gz
mkdir -p /opt/local
mv hbase-2.1.5 /opt/local/hbase

配置HBase執行命令路徑:

export HBASE_HOME=/opt/local/hbase
export PATH=$PATH:$HBASE_HOME/bin
  1. 配置軟件

vim conf/hbase-env.sh

#JDK安裝目錄
export JAVA_HOME=/usr/local/jre1.8.0_201
#配置hbase自己管理zookeeper
export HBASE_MANAGES_ZK=true

vim conf/hbase-site.xml

<configuration>

  <!-- zookeeper端口  -->
  <property>
      <name>hbase.zookeeper.property.clientPort</name>
      <value>2182</value>                                                                                                                                           
  </property>

  <!--  HBase 數據存儲目錄 -->
  <property>
    <name>hbase.rootdir</name>
    <value>file:///opt/local/hbase/data</value>
  </property>

  <!-- 用於指定 ZooKeeper 數據存儲目錄 -->
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/opt/local/hbase/data/zookeeper</value>
  </property>

  <!-- 用於指定臨時數據存儲目錄 -->
  <property>
    <name>hbase.tmp.dir</name>
    <value>/opt/local/hbase/temp/hbase-${user.name}</value>
  </property>
</configuration>

其中 hbase.rootdir 和 hbase.zookeeper.property.dataDir 都用來指定數據存放的目錄,默認情況下hbase會使用/tmp目錄,這顯然是不合適的。
配置了這兩個路徑之後,hbase會自動創建相應的目錄。

關於更多的參數設定可

  1. 啟動軟件
start-hbase.sh

此時查看 logs/hbase-root-master-host-xxx.log,如下:

2019-07-11 07:37:23,654 INFO  [localhost:33539.activeMasterManager] hbase.MetaMigrationConvertingToPB: hbase:meta doesn't have any entries to update.
2019-07-11 07:37:23,654 INFO  [localhost:33539.activeMasterManager] hbase.MetaMigrationConvertingToPB: META already up-to date with PB serialization
2019-07-11 07:37:23,664 INFO  [localhost:33539.activeMasterManager] master.AssignmentManager: Clean cluster startup. Assigning user regions
2019-07-11 07:37:23,665 INFO  [localhost:33539.activeMasterManager] master.AssignmentManager: Joined the cluster in 11ms, failover=false
2019-07-11 07:37:23,672 INFO  [localhost:33539.activeMasterManager] master.TableNamespaceManager: Namespace table not found. Creating...

檢查進程情況,發現進程已經啟動

ps -ef |grep hadoop
root     11049 11032  2 07:37 pts/1    00:00:20 /usr/local/jre1.8.0_201/bin/java -Dproc_master -XX:OnOutOfMemoryError=kill -9 %p -XX:+UseConcMarkSweepGC -XX:PermSize=128m -XX:MaxPermSize=128m -XX:ReservedCodeCacheSize=256m -Dhbase.log.dir=/opt/local/hbase/logs -Dhbase.log.file=hbase-root-master-host-192-168-138-148.log -Dhbase.home.dir=/opt/local/hbase -Dhbase.id.str=root -Dhbase.root.logger=INFO,RFA -Dhbase.security.logger=INFO,RFAS org.apache.hadoop.hbase.master.HMaster start
root     18907 30747  0 07:50 pts/1    00:00:00 grep --color=auto hadoop

通過JPS(JDK自帶的檢查工具) 可以看到當前啟動的Java進程:

# jps
5701 Jps
4826 HMaster
1311 jar

查看 data目錄,發現生成了對應的文件:

host:/opt/local/hbase/data # ls -lh .
total 36K
drwx------. 4 root root 4.0K Jul 11 08:08 data
drwx------. 4 root root 4.0K Jul 11 08:08 hbase
-rw-r--r--. 1 root root   42 Jul 11 08:08 hbase.id
-rw-r--r--. 1 root root    7 Jul 11 08:08 hbase.version
drwx------. 2 root root 4.0K Jul 11 08:08 MasterProcWALs
drwx------. 2 root root 4.0K Jul 11 08:08 oldWALs
drwx------. 3 root root 4.0K Jul 11 08:08 .tmp
drwx------. 3 root root 4.0K Jul 11 08:08 WALs
drwx------. 3 root root 4.0K Jul 11 08:08 zookeeper

關於運行模式
HBase啟動時默認會使用單機模式,此時 Zookeeper和 HMaster/RegionServer 會運行在同一個JVM中。
以standalone模式啟動的HBase會包含一個HMaster、RegionServer、Zookeeper實例,此時 HBase 會直接使用本地文件系統而不是HDFS。

通過將 conf/hbase-site.xml中的 hbase.cluster.distributed 配置為true,就是集群模式了。
在這個模式下,你可以使用分佈式環境進行部署,或者是”偽分佈式”的多進程環境。

<configuration>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
</configuration>

需要注意的是,如果以standalone啟動的話,HMaster、RegionServer端口都是隨機的,無法通過配置文件指定。

四、基本使用

打開HBase Shell

hbase shell

執行status命令

Version 2.1.5, r76ab087819fe82ccf6f531096e18ad1bed079651, Wed Jun  5 16:48:11 PDT 2019

hbase(main):001:0> status
1 active master, 0 backup masters, 1 servers, 0 dead, 2.0000 average load

這表示有一個Master在運行,一個RegionServer,每個RegionServer包含2個Region。

表操作

  • 創建DeviceState表
hbase(main):002:0> create "DeviceState", "name:c1", "state:c2"

=> Hbase::Table - DeviceState

此時,已經創建了一個DeviceState表,包含name(設備名稱)、state(狀態)兩個列。

查看錶信息:

hbase(main):003:0> list
TABLE
DeviceState
1 row(s) in 0.0090 seconds

=> ["DeviceState"]

hbase(main):003:0> describe "DeviceState"
Table DeviceState is ENABLED
DeviceState
COLUMN FAMILIES DESCRIPTION
{NAME => 'name', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSIO
N => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
{NAME => 'state', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSI
ON => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
2 row(s) in 0.0870 seconds
  • 寫入數據

通過下面的命令,向DeviceState寫入兩條記錄,由於有兩個列族,因此需要寫入四個單元格數據:

put "DeviceState", "row1", "name", "空調"
put "DeviceState", "row1", "state", "打開"
put "DeviceState", "row2", "name", "電視機"
put "DeviceState", "row2", "state", "關閉"
  • 查詢數據

查詢某行、某列

hbase(main):012:0> get "DeviceState","row1"
COLUMN                                      CELL
 name:                                      timestamp=1562834473008, value=\xE7\x94\xB5\xE8\xA7\x86\xE6\x9C\xBA
 state:                                     timestamp=1562834474630, value=\xE5\x85\xB3\xE9\x97\xAD
1 row(s) in 0.0230 seconds

hbase(main):013:0> get "DeviceState","row1", "name"
COLUMN                                      CELL
 name:                                      timestamp=1562834473008, value=\xE7\x94\xB5\xE8\xA7\x86\xE6\x9C\xBA
1 row(s) in 0.0200 seconds

掃描表

hbase(main):026:0> scan "DeviceState"
ROW                                         COLUMN+CELL
 row1                                       column=name:, timestamp=1562834999374, value=\xE7\xA9\xBA\xE8\xB0\x83
 row1                                       column=state:, timestamp=1562834999421, value=\xE6\x89\x93\xE5\xBC\x80
 row2                                       column=name:, timestamp=1562834999452, value=\xE7\x94\xB5\xE8\xA7\x86\xE6\x9C\xBA
 row2                                       column=state:, timestamp=1562835001064, value=\xE5\x85\xB3\xE9\x97\xAD
2 row(s) in 0.0250 seconds

查詢數量

hbase(main):014:0> count "DeviceState"
2 row(s) in 0.0370 seconds

=> 1
  • 清除數據

刪除某列、某行

delete "DeviceState", "row1", "name"
0 row(s) in 0.0080 seconds

hbase(main):003:0> deleteall "DeviceState", "row2"
0 row(s) in 0.1290 seconds

清空整個表數據

hbase(main):021:0> truncate "DeviceState"
Truncating 'DeviceState' table (it may take a while):
 - Disabling table...
 - Truncating table...
0 row(s) in 3.5060 seconds

刪除表(需要先disable)

hbase(main):006:0> disable "DeviceState"
0 row(s) in 2.2690 seconds

hbase(main):007:0> drop "DeviceState"
0 row(s) in 1.2880 seconds

五、FAQ

  • 啟動時提示 ZK 端口監聽失敗:
    Could not start ZK at requested port of 2181. ZK was started at port: 2182. Aborting as clients (e.g. shell) will not be able to find this ZK quorum

原因
HBase需要啟動Zookeeper,而本地的2181端口已經被啟用(可能有其他Zookeeper實例)

解決辦法
conf/hbase-site.xml中修改hbase.zookeeper.property.clientPort的值,將其修改為2182,:

<configuration>
  <property>
      <name>hbase.zookeeper.property.clientPort</name>
      <value>2182</value>                                                                                                                                           
  </property>
</configuration>
  • 啟動HBase Shell時提示java.lang.UnsatisfiedLinkError

原因
在執行hbase shell期間,JRuby會在“java.io.tmpdir”路徑下創建一個臨時文件,該路徑的默認值為“/tmp”。如果為“/tmp”目錄設置NOEXEC權限,然後hbase shell會啟動失敗並拋出“java.lang.UnsatisfiedLinkError”錯誤。

解決辦法

  1. 取消/tmp的noexec權限(不推薦)
  2. 設置java.io.tmpdir變量,指向可用的路徑,編輯conf/hbase-env.sh文件:
export HBASE_TMP_DIR=/opt/local/hbase/temp
export HBASE_OPTS="-XX:+UseConcMarkSweepGC -Djava.io.tmpdir=$HBASE_TMP_DIR"

參考文檔

HBase 官方權威指南

HBase 單機模式搭建

HBase 深入淺出
較詳細介紹了HBase的由來以及特性,文中提供了HBase集群、存儲機制的一些簡介,非常適合入門閱讀

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

平板收購,iphone手機收購,二手筆電回收,二手iphone收購-全台皆可收購

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

RocketMQ一個新的消費組初次啟動時從何處開始消費呢?

目錄

@(本文目錄)

1、拋出問題

一個新的消費組訂閱一個已存在的Topic主題時,消費組是從該Topic的哪條消息開始消費呢?

首先翻閱DefaultMQPushConsumer的API時,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼帘,從字面意思來看是設置消費者從哪裡開始消費,正是解開該問題的”鑰匙“。ConsumeFromWhere枚舉類圖如下:

  • CONSUME_FROM_MAX_OFFSET
    從消費隊列最大的偏移量開始消費。
  • CONSUME_FROM_FIRST_OFFSET
    從消費隊列最小偏移量開始消費。
  • CONSUME_FROM_TIMESTAMP
    從指定的時間戳開始消費,默認為消費者啟動之前的30分鐘處開始消費。可以通過DefaultMQPushConsumer#setConsumeTimestamp。

是不是點小激動,還不快試試。

需求:新的消費組啟動時,從隊列最後開始消費,即只消費啟動后發送到消息服務器后的最新消息。

1.1 環境準備

本示例所用到的Topic路由信息如下:

Broker的配置如下(broker.conf)

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store
storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store/commitlog
namesrvAddr=127.0.0.1:9876
autoCreateTopicEnable=false
mapedFileSizeCommitLog=10240
mapedFileSizeConsumeQueue=2000

其中重點修改了如下兩個參數:

  • mapedFileSizeCommitLog
    單個commitlog文件的大小,這裏使用10M,方便測試用。
  • mapedFileSizeConsumeQueue
    單個consumequeue隊列長度,這裏使用1000,表示一個consumequeue文件中包含1000個條目。

1.2 消息發送者代碼

public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 300; i++) {
        try {
            Message msg = new Message("TopicTest" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}

通過上述,往TopicTest發送300條消息,發送完畢后,RocketMQ Broker存儲結構如下:

1.3 消費端驗證代碼

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_01");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

執行上述代碼后,按照期望,應該是不會消費任何消息,只有等生產者再發送消息后,才會對消息進行消費,事實是這樣嗎?執行效果如圖所示:

令人意外的是,竟然從隊列的最小偏移量開始消費了,這就“尷尬”了。難不成是RocketMQ的Bug。帶着這個疑問,從源碼的角度嘗試來解讀該問題,並指導我們實踐。

2、探究CONSUME_FROM_MAX_OFFSET實現原理

對於一個新的消費組,無論是集群模式還是廣播模式都不會存儲該消費組的消費進度,可以理解為-1,此時就需要根據DefaultMQPushConsumer#consumeFromWhere屬性來決定其從何處開始消費,首先我們需要找到其對應的處理入口。我們知道,消息消費者從Broker服務器拉取消息時,需要進行消費隊列的負載,即RebalanceImpl。

溫馨提示:本文不會詳細介紹RocketMQ消息隊列負載、消息拉取、消息消費邏輯,只會展示出通往該問題的簡短流程,如想詳細了解消息消費具體細節,建議購買筆者出版的《RocketMQ技術內幕》書籍。

RebalancePushImpl#computePullFromWhere

public long computePullFromWhere(MessageQueue mq) {
        long result = -1;                                                                                                                                                                                                                  // @1
        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();    
        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
        switch (consumeFromWhere) {
            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
            case CONSUME_FROM_MIN_OFFSET:
            case CONSUME_FROM_MAX_OFFSET:
            case CONSUME_FROM_LAST_OFFSET: {                                                                                                                                                                // @2
               // 省略部分代碼
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {                                                                                                                                                              // @3
                // 省略部分代碼
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {                                                                                                                                                                  //@4
                // 省略部分代碼
                break;
            }
            default:
                break;
        }
        return result;                                                                                                                                                                                                                  // @5
    }

代碼@1:先解釋幾個局部變量。

  • result
    最終的返回結果,默認為-1。
  • consumeFromWhere
    消息消費者開始消費的策略,即CONSUME_FROM_LAST_OFFSET等。
  • offsetStore
    offset存儲器,消費組消息偏移量存儲實現器。

代碼@2:CONSUME_FROM_LAST_OFFSET(從隊列的最大偏移量開始消費)的處理邏輯,下文會詳細介紹。

代碼@3:CONSUME_FROM_FIRST_OFFSET(從隊列最小偏移量開始消費)的處理邏輯,下文會詳細介紹。

代碼@4:CONSUME_FROM_TIMESTAMP(從指定時間戳開始消費)的處理邏輯,下文會詳細介紹。

代碼@5:返回最後計算的偏移量,從該偏移量出開始消費。

2.1 CONSUME_FROM_LAST_OFFSET計算邏輯

case CONSUME_FROM_LAST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {                                                                                                             // @2
        result = lastOffset;
    }
    // First start,no offset
    else if (-1 == lastOffset) {                                                                                                  // @3
        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {               
            result = 0L;
        } else {
            try {
                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);                     
            } catch (MQClientException e) {                                                                              // @4
                result = -1;
            }
        }
    } else {
        result = -1;    
    }
    break;
}

代碼@1:使用offsetStore從消息消費進度文件中讀取消費消費進度,本文將以集群模式為例展開。稍後詳細分析。

代碼@2:如果返回的偏移量大於等於0,則直接使用該offset,這個也能理解,大於等於0,表示查詢到有效的消息消費進度,從該有效進度開始消費,但我們要特別留意lastOffset為0是什麼場景,因為返回0,並不會執行CONSUME_FROM_LAST_OFFSET(語義)。

代碼@3:如果lastOffset為-1,表示當前並未存儲其有效偏移量,可以理解為第一次消費,如果是消費組重試主題,從重試隊列偏移量為0開始消費;如果是普通主題,則從隊列當前的最大的有效偏移量開始消費,即CONSUME_FROM_LAST_OFFSET語義的實現。

代碼@4:如果從遠程服務拉取最大偏移量拉取異常或其他情況,則使用-1作為第一次拉取偏移量。

分析,上述執行的現象,雖然設置的是CONSUME_FROM_LAST_OFFSET,但現象是從隊列的第一條消息開始消費,根據上述源碼的分析,只有從消費組消費進度存儲文件中取到的消息偏移量為0時,才會從第一條消息開始消費,故接下來重點分析消息消費進度存儲器(OffsetStore)在什麼情況下會返回0。

接下來我們將以集群模式來查看一下消息消費進度的查詢邏輯,集群模式的消息進度存儲管理器實現為:
RemoteBrokerOffsetStore,最終Broker端的命令處理類為:ConsumerManageProcessor。

ConsumerManageProcessor#queryConsumerOffset
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response =
        RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
    final QueryConsumerOffsetResponseHeader responseHeader =
        (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
    final QueryConsumerOffsetRequestHeader requestHeader =
        (QueryConsumerOffsetRequestHeader) request
            .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

    long offset =
        this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());    // @1

    if (offset >= 0) {                                                                                                                                          // @2
        responseHeader.setOffset(offset);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
    } else {                                                                                                                                                       // @3
        long minOffset =
            this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
                requestHeader.getQueueId());                                                                                                     // @4
        if (minOffset <= 0
            && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(                                // @5
            requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
            responseHeader.setOffset(0L);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        } else {                                                                                                                                                 // @6
            response.setCode(ResponseCode.QUERY_NOT_FOUND);
            response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
        }
    }
    return response;
}

代碼@1:從消費消息進度文件中查詢消息消費進度。

代碼@2:如果消息消費進度文件中存儲該隊列的消息進度,其返回的offset必然會大於等於0,則直接返回該偏移量該客戶端,客戶端從該偏移量開始消費。

代碼@3:如果未從消息消費進度文件中查詢到其進度,offset為-1。則首先獲取該主題、消息隊列當前在Broker服務器中的最小偏移量(@4)。如果小於等於0(返回0則表示該隊列的文件還未曾刪除過)並且其最小偏移量對應的消息存儲在內存中而不是存在磁盤中,則返回偏移量0,這就意味着ConsumeFromWhere中定義的三種枚舉類型都不會生效,直接從0開始消費,到這裏就能解開其謎團了(@5)。

代碼@6:如果偏移量小於等於0,但其消息已經存儲在磁盤中,此時返回未找到,最終RebalancePushImpl#computePullFromWhere中得到的偏移量為-1。

看到這裏,大家應該能回答文章開頭處提到的問題了吧?

看到這裏,大家應該明白了,為什麼設置的CONSUME_FROM_LAST_OFFSET,但消費組是從消息隊列的開始處消費了吧,原因就是消息消費進度文件中並沒有找到其消息消費進度,並且該隊列在Broker端的最小偏移量為0,說的更直白點,consumequeue/topicName/queueNum的第一個消息消費隊列文件為00000000000000000000,並且消息其對應的消息緩存在Broker端的內存中(pageCache),其返回給消費端的偏移量為0,故會從0開始消費,而不是從隊列的最大偏移量處開始消費。

為了知識體系的完備性,我們順便來看一下其他兩種策略的計算邏輯。

2.2 CONSUME_FROM_FIRST_OFFSET

case CONSUME_FROM_FIRST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {    // @2
        result = lastOffset;
    } else if (-1 == lastOffset) {  // @3
        result = 0L;
    } else {                                  
        result = -1;                    // @4
    }
    break;
}

從隊列的開始偏移量開始消費,其計算邏輯如下:
代碼@1:首先通過偏移量存儲器查詢消費隊列的消費進度。

代碼@2:如果大於等於0,則從當前該偏移量開始消費。

代碼@3:如果遠程返回-1,表示並沒有存儲該隊列的消息消費進度,從0開始。

代碼@4:否則從-1開始消費。

2.4 CONSUME_FROM_TIMESTAMP

從指定時戳后的消息開始消費。

case CONSUME_FROM_TIMESTAMP: {
    ong lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {                                                                                                            // @2
        result = lastOffset;
    } else if (-1 == lastOffset) {                                                                                                 // @3
        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            try {
                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
            } catch (MQClientException e) {
                result = -1;
            }
        } else {
            try {
                long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                    UtilAll.YYYYMMDDHHMMSS).getTime();
                result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
            } catch (MQClientException e) {
                result = -1;
            }
        }
    } else {
        result = -1;
    }
    break;
}

其基本套路與CONSUME_FROM_LAST_OFFSET一樣:
代碼@1:首先通過偏移量存儲器查詢消費隊列的消費進度。

代碼@2:如果大於等於0,則從當前該偏移量開始消費。

代碼@3:如果遠程返回-1,表示並沒有存儲該隊列的消息消費進度,如果是重試主題,則從當前隊列的最大偏移量開始消費,如果是普通主題,則根據時間戳去Broker端查詢,根據查詢到的偏移量開始消費。

原理就介紹到這裏,下面根據上述理論對其進行驗證。

3、猜想與驗證

根據上述理論分析我們得知設置CONSUME_FROM_LAST_OFFSET但並不是從消息隊列的最大偏移量開始消費的“罪魁禍首”是因為消息消費隊列的最小偏移量為0,如果不為0,則就會符合預期,我們來驗證一下這個猜想。
首先我們刪除commitlog目錄下的文件,如圖所示:

其消費隊列截圖如下:

消費端的驗證代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_02");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

運行結果如下:

並沒有消息存在的消息,符合預期。

4、解決方案

如果在生產環境下,一個新的消費組訂閱一個已經存在比較久的topic,設置CONSUME_FROM_MAX_OFFSET是符合預期的,即該主題的consumequeue/{queueNum}/fileName,fileName通常不會是00000000000000000000,如是是上面文件名,想要實現從隊列的最後開始消費,該如何做呢?那就走自動創建消費組的路子,執行如下命令:

./mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g my_consumer_05

//克隆一個訂閱了該topic的消費組消費進度
./mqadmin cloneGroupOffset -n 127.0.0.1:9876 -s my_consumer_01 -d my_consumer_05 -t TopicTest

//重置消費進度到當前隊列的最大值
./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g my_consumer_05 -t TopicTest -s -1

按照上上述命令后,即可實現其目的。

您都看到這裏了,麻煩幫忙點個贊,謝謝您的認可與鼓勵。

作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社區佈道師,公眾號: 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。歡迎加入我的知識星球,構建一個高質量的技術交流社群。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

Spring Security之多次登錄失敗后賬戶鎖定功能的實現

在上一次寫的文章中,為大家說到了如何動態的從數據庫加載用戶、角色、權限信息,從而實現登錄驗證及授權。在實際的開發過程中,我們通常會有這樣的一個需求:當用戶多次登錄失敗的時候,我們應該將賬戶鎖定,等待一定的時間之後才能再次進行登錄操作。

一、基礎知識回顧

要實現多次登錄失敗賬戶鎖定的功能,我們需要先回顧一下基礎知識:

  • Spring Security 不需要我們自己實現登錄驗證邏輯,而是將用戶、角色、權限信息以實現UserDetails和UserDetailsService接口的方式告知Spring Security。具體的登錄驗證邏輯Spring Security 會幫助我們實現。
  • UserDetails接口中有一個方法叫做isAccountNonLocked()用於判斷賬號是否被鎖定,也就是說我們應該通過該方法對應的set方法setAccountNonLocked(false)告知Spring Security該登錄賬戶被鎖定。
  • 那麼應該在哪裡判斷賬號登錄失敗的次數並執行鎖定機制呢?當然是我們之前文章給大家介紹的《自定義登錄成功及失敗結果處理》的AuthenticationFailureHandler。

建議您先閱讀本文,如果您對本文的實現過程感到迷惑,建議您再翻看本號之前的相關內容。

二、實現多次登錄失敗鎖定的原理

一般來說實現這個需求,我們需要針對每一個用戶記錄登錄失敗的次數nLock和鎖定賬戶的到期時間releaseTime。具體你是把這2個信息存儲在mysql、還是文件中、還是redis中等等,完全取決於你對你所處的應用架構適用性的判斷。具體的實現邏輯無非就是:

  • 登陸失敗之後,從存儲中將nLock取出來加1。
  • 如果nLock大於登陸失敗閾值(比如3次),則將nLock=0,然後設置releaseTime為當前時間加上鎖定周期。通過setAccountNonLocked(false)告知Spring Security該登錄賬戶被鎖定。
  • 如果nLock小於等於1,則將nLock再次存起來。
  • 在一個合適的時機,將鎖定狀態重置為setAccountNonLocked(true)。

這是一種非常典型的實現方式,筆者向大家介紹一款非常有用的開源軟件叫做:ratelimitj。這個軟件的功能主要是為API訪問進行限流,也就是說可以通過制定規則限制API接口的訪問頻率。那恰好登錄驗證接口也是API的一種啊,我們正好也需要限制它在一定的時間內的訪問次數。

三、具體實現

首先需要將ratelimitj通過maven坐標引入到我們的應用裏面來。我們使用的是內存存儲的版本,還有redis存儲的版本,大家可以根據自己的應用情況選用。

        <dependency>
            <groupId>es.moki.ratelimitj</groupId>
            <artifactId>ratelimitj-inmemory</artifactId>
            <version>0.4.1</version>
        </dependency>

之後通過繼承SimpleUrlAuthenticationFailureHandler ,實現onAuthenticationFailure方法。該實現是針對登錄失敗的結果的處理,在我們之前的文章中已經講過。

@Component
public class MyAuthenticationFailureHandler extends SimpleUrlAuthenticationFailureHandler {

    @Autowired
    UserDetailsManager userDetailsManager;

    //規則定義:1小時之內5次機會,就觸發限流行為
    Set<RequestLimitRule> rules = 
            Collections.singleton(RequestLimitRule.of(1 * 60, TimeUnit.MINUTES,5)); 
    RequestRateLimiter limiter = new InMemorySlidingWindowRequestRateLimiter(rules);


    @Override
    public void onAuthenticationFailure(HttpServletRequest request,
                                        HttpServletResponse response, 
                                        AuthenticationException exception) 
                                        throws IOException, ServletException {

         String userId = //從request或request.getSession中獲取登錄用戶名
         //計數器加1,並判斷該用戶是否已經到了觸發了鎖定規則
         boolean reachLimit = limiter.overLimitWhenIncremented(userId);

        if(reachLimit){ //如果觸發了鎖定規則,通過UserDetails告知Spring Security鎖定賬戶
               user.setAccountNonLocked(false);
               userDetailsManager.updateUser(user);
               SysUser user = (SysUser) userDetailsManager.loadUserByUsername(userId);
        }
        

        
        //此處省略通過response做json或html響應
    }
}
  • 核心實現注意看代碼中的註釋
  • 代碼中的SysUser為UserDetails的實現類,如果不知道如何實現請參考本號之前的文章
  • userDetailsManager被用於管理UserDetails信息,通過改變UserDetails改變Spring Security驗證行為。

四、重置鎖定狀態的時機

user.setAccountNonLocked(true);

重置鎖定狀態很簡單,就是上面的代碼。但是更重要的是如何選擇重置鎖定狀態的時機。筆者能想到幾種方案如下

  • 下一次登陸的時候,自定義過濾器,加在Spring Boot過濾器鏈最前端做鎖定狀態重置的判斷。
  • 當登錄賬戶被鎖定之後,之後用戶的每一次登錄都會拋出LockedException。我們完全可以通過Spring Boot的全局異常捕獲機制,在其中捕獲LockedException,並做鎖定狀態的判斷及重置行為。
  • 寫一個Spring 的定時器輪詢,當然這是最差的方案。

    期待您的關注

  • 向您推薦博主的系列文檔:
  • 本文轉載註明出處(必須帶連接,不能只轉文字):。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※高價收購3C產品,價格不怕你比較

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

3c收購,鏡頭 收購有可能以全新價回收嗎?

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

Head First設計模式——適配器和外觀模式,Head First設計模式——裝飾者模式

前言:為什麼要一次講解這兩個模式,說點騷話:因為比較簡單(*^_^*),其實是他們兩個有相似和有時候我們容易搞混概念。

講到這兩個設計模式與另外一個“裝飾者模式”也有相似,他們三個按照結構模式分類都屬於“結構性模式”,所有我們接下來就來看什麼是適配器模式和外觀模式。

另外裝飾模式可以看我的另一篇博文→

一、適配器模式

適配器對應到我們現實生活中的例子,最典型的就是插頭接口適配器,比如我們買的有些港版手機充電頭是圓形三角插頭,而大陸的三角電源插板插不進去港版的插頭。

這時候我們就會在某寶上買個轉接頭轉換一下,而這個轉接頭就是適配器,用它來適配港版手機充電頭讓他能夠插入到我們的電源插板裏面。

在設計模式中這個適配器是什麼,用程序如何表現,先讓我舉個栗子:我們有一隻鴨子,一隻雞,我們如何通過適配器轉換鴨和雞。

鴨子有很多種,我們定義一個鴨子的接口,然後以綠頭鴨為例。關於這個綠頭鴨在策略模式也有用到,可以看看我另一篇綠頭鴨如何攪動策略模式→

    public  interface Duck
    {
        //叫
        public void Quack();
        //飛
        public void Fly();
    }

    public class GreenDuck : Duck
    {
        public void Fly()
        {
            Console.WriteLine("綠頭鴨,飛");
        }

        public void Quack()
        {
            Console.WriteLine("綠頭鴨,呱呱叫");
        }
    }

  同樣我們定義一個雞的接口,和一隻母雞的類

    public  interface Chicken
    {
        //叫
        public void Gobble();
        //飛
        public void Fly();
    }

    public class Hen : Chicken
    {
       
        public void Gobble()
        {
            Console.WriteLine("母雞,咯咯叫");
        }

        public void Fly()
        {
            Console.WriteLine("母雞,飛");
        }

    }

  鴨子和母雞的叫聲不一樣,現在我們讓母雞來冒充鴨子,利用適配器模式如何做。 直接看代碼吧

    /// <summary>
    /// 母雞適配器
    /// 適配母雞讓它變成鴨子
    /// </summary>
    public class HenAdapter : Duck
    {
        Chicken chicken;
        public HenAdapter(Chicken chicken)
        {
            this.chicken = chicken;
        }
        public void Quack()
        {
            //調用母雞咯咯叫
            chicken.Gobble();
        }

        public void Fly()
        {
            //調用母雞飛
            chicken.Fly();
        }

    }

  測試母雞適配器

如上我們使用母雞適配器將母雞適配成了鴨子,鴨子也可以用適配器將鴨子適配成母雞,適配器模式定義:

適配器模式:將一個類的接口,裝換成客戶期望的另一個接口。適配器讓原本接口不兼容的類可以合作無間。

與適配器看起來相似的裝飾者模式是包裝對象的行為或責任,裝飾者被包裝后可能會繼續被包裝,他們不裝換接口,而適配器則一定會進行接口的轉換。

適配的工作是將一個接口轉換成另外一個接口,雖然大多數適配器採取的例子都是讓一個適配器包裝一個被適配者,但是有時候我們需要讓一個適配器包裝多個被適配者。

而這實際又涉及到另外一個模式,就是外觀模式,我們常常將適配器模式和外觀模式混為一談,那接着就來講解外觀模式。

二、外觀模式

外觀模式以家庭影院為例,家庭影院有許多組件構成,比如:显示屏、DVD、音響、燈光等等。

當我們要看電影的時候要打開显示屏,打開DVD,打開音響,關閉燈光等一系列動作,將這些動作寫成類方法的調用

            Screen screen = new Screen();
            DVD dvd = new DVD();
            SoundEngineer sound = new SoundEngineer();
            Light light = new Light();

            screen.Down();
            dvd.PlayDVD();
            sound.TurnOn();
            light.TurnOff();

可以看到每次我們要使用就要調用一篇這些方法,如果要關閉呢,我們也需要調用一篇。而我們正需要的就是一個外觀:通過實現一個提供更合理的接口的外觀類。

還是看代碼吧

 public class HomeThreaterFacade
    {
        Screen screen;
        DVD dvd;
        SoundEngineer sound;
        Light light;

        public HomeThreaterFacade(Screen screen, DVD dvd, SoundEngineer sound, Light light)
        {
            this.screen = screen;
            this.dvd = dvd;
            this.sound = sound;
            this.light = light;
        }

        public void WatchMovie()
        {
            Console.WriteLine("開始播放電影......");
            screen.Down();
            dvd.PlayDVD();
            sound.TurnOn();
            light.TurnOff();
        }
    }

由於其他類比較簡單就是一個打印輸出,我就不列出來了,還有關閉方法同理也很簡單就實現了。

還是測試一下效果:

外觀模式定義

外觀模式:提供了一個統一的接口,用來訪問子系統中的一群接口。外觀定義了一個高層接口,讓子系統更容易使用。

外觀模式遵循了一個設計原則

最少知識原則:之和你的密友談話。

這個原則希望我們在設計中,不要讓太多的類耦合在一起,免得修改系統中一部分,會影響其他部分。而外觀模式讓用戶不用關心全部子系統組件,讓客戶變得簡單有彈性。我們可以在不影響客戶的情況下升級外觀模式里的組件,而客戶只有一個朋友,也就是外觀模式。

三、適配器模式與外觀模式區別

從上面例子我們也許會覺得適配器和外觀模式之間的差異在於:適配器包裝一個類,而外觀可以代表許多類

但是實際它們的本質和作用並不是在於包裝多少類,適配器模式將一個或多個接口變成客戶期望的一個接口,我們一般適配一個類,但是特殊需求也可以適配多個類來提供一個接口。類是地,一個外觀也可以只爭對一個複雜接口的類提供簡化接口。兩中模式的差異在於他們的意圖。適配器模式意圖是將接口裝換成不同接口,外觀的意圖是簡化接口。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

平板收購,iphone手機收購,二手筆電回收,二手iphone收購-全台皆可收購

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

Spring框架AOP學習總結(下)

目錄

@
在中主要講的是一些Spring的概述、Spring工廠、Spring屬性注入以及IOC入門,其中最重要的是IOC,上一篇中IOC大概講的小結一下:

然後呢這一篇中主要講一下Spring中除了IOC之外的另一個重要的核心:AOP,在Spring中IOC也好,AOP也好,都必須會二者的XML開發以及註解開發,也就是說IOC和AOP的XML開發以及註解開發都要掌握

1、 AOP 的概述

從專業的角度來講(千萬不要問我有多專業,度娘是我表鍋不對是表嫂QAQ):

在軟件業,AOP為Aspect Oriented Programming的縮寫,意為:面向切面編程,通過預編譯方式和運行期動態代理實現程序功能的統一維護的一種技術。AOP是OOP的延續,是軟件開發中的一個熱點,也是Spring框架中的一個重要內容,是函數式編程的一種衍生范型。利用AOP可以對業務邏輯的各個部分進行隔離,從而使得業務邏輯各部分之間的耦合度降低,提高程序的可重用性,同時提高了開發的效率。

從通俗易懂且不失風趣的角度來講:(來自武哥文章)

面向切面編程的目標就是分離關注點。什麼是關注點呢?就是你要做的事,就是關注點。假如你是個公子哥,沒啥人生目標,天天就是衣來伸手,飯來張口,整天只知道玩一件事!那麼,每天你一睜眼,就光想着吃完飯就去玩(你必須要做的事),但是在玩之前,你還需要穿衣服、穿鞋子、疊好被子、做飯等等等等事情,這些事情就是你的關注點,但是你只想吃飯然後玩,那麼怎麼辦呢?這些事情通通交給別人去干。在你走到飯桌之前,有一個專門的僕人A幫你穿衣服,僕人B幫你穿鞋子,僕人C幫你疊好被子,僕人C幫你做飯,然後你就開始吃飯、去玩(這就是你一天的正事),你幹完你的正事之後,回來,然後一系列僕人又開始幫你干這個干那個,然後一天就結束了!
AOP的好處就是你只需要干你的正事,其它事情別人幫你干。也許有一天,你想裸奔,不想穿衣服,那麼你把僕人A解僱就是了!也許有一天,出門之前你還想帶點錢,那麼你再雇一個僕人D專門幫你干取錢的活!這就是AOP。每個人各司其職,靈活組合,達到一種可配置的、可插拔的程序結構。
從Spring的角度看,AOP最大的用途就在於提供了事務管理的能力。事務管理就是一個關注點,你的正事就是去訪問數據庫,而你不想管事務(太煩),所以,Spring在你訪問數據庫之前,自動幫你開啟事務,當你訪問數據庫結束之後,自動幫你提交/回滾事務!

1、1 為什麼學習 AOP

Spring 的 AOP 的由來:AOP 最早由 AOP 聯盟的組織提出的,制定了一套規範.Spring 將 AOP 思想引入到框架中,必須遵守 AOP 聯盟的規範.

Aop解決實際開發中的一些問題:

  • AOP 解決 OOP 中遇到的一些問題.是 OOP 的延續和擴展.

對程序進行增強:不修改源碼的情況下:

  • AOP 可以進行權限校驗,日誌記錄,性能監控,事務控制.

1、2 AOP底層實現: 代理機制(了解)

Spring 的 AOP 的底層用到兩種代理機制:

  • JDK 的動態代理 :針對實現了接口的類產生代理.
  • Cglib 的動態代理 :針對沒有實現接口的類產生代理. 應用的是底層的字節碼增強的技術 生成當前類的子類對象

spring底層會完成自動代理,實現了接口的類默認使用的是JDK 的動態代理,相反的,沒有實現接口的類默認使用的是Cglib 的動態代理 ,底層代碼可以不懂但這個概念一定要知道,不然會被鄙視的,O(∩_∩)O哈哈~,下面是底層代碼,有興趣的可以了解了解。

JDK 動態代理增強一個類中方法:

public class MyJDKProxy implements InvocationHandler {
        private UserDao userDao;

        public MyJDKProxy(UserDao userDao) {
            this.userDao = userDao;
        }

        // 編寫工具方法:生成代理:
        public UserDao createProxy() {
            UserDao userDaoProxy = (UserDao) Proxy.newProxyInstance(userDao
                    .getClass().getClassLoader(), userDao.getClass()
                    .getInterfaces(), this);
            return userDaoProxy;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args)
                throws Throwable {
            if ("save".equals(method.getName())) {
                System.out.println("權限校驗================");
            }
            return method.invoke(userDao, args);
        }
    }

Cglib 動態代理增強一個類中的方法:

public class MyCglibProxy implements MethodInterceptor {
        private CustomerDao customerDao;

        public MyCglibProxy(CustomerDao customerDao) {
            this.customerDao = customerDao;
        }

        // 生成代理的方法:
        public CustomerDao createProxy() {
            // 創建 Cglib 的核心類:
            Enhancer enhancer = new Enhancer();
            // 設置父類:
            enhancer.setSuperclass(CustomerDao.class);
            // 設置回調:
            enhancer.setCallback(this);
            // 生成代理:
            CustomerDao customerDaoProxy = (CustomerDao) enhancer.create();
            return customerDaoProxy;
        }

        @Override
        public Object intercept(Object proxy, Method method, Object[] args,
                MethodProxy methodProxy) throws Throwable {
            if ("delete".equals(method.getName())) {
                Object obj = methodProxy.invokeSuper(proxy, args);
                System.out.println("日誌記錄================");
                return obj;
            }
            return methodProxy.invokeSuper(proxy, args);
        }
    }

2、 Spring 基於AspectJ 進行 AOP 的開發入門(XML 的方式):

首先,Spring為什麼不直接進行Spring的AOP開發呢,而要基於Aspectj呢,是因為,Spring自己的AOP開發實現方式(傳統的AOP開發)繁瑣且複雜,效率極低,於是傳統的AOP開發基本上棄用了,相反Aspectj的AOP開發效率高,所以AOP開發一般是Spring 的基於 AspectJ 的 AOP 開發。

2.1 AOP 的開發中的相關術語:

Aop是一種非常高深的思想,當然會有非常專業的相關術語了(這彎繞的,你打幾分?)

從專業的角度角度概述定義(相對來說比較枯燥不易理解):

Joinpoint(連接點):所謂連接點是指那些被攔截到的點。在 spring 中,這些點指的是方法,因為 spring 只
支持方法類型的連接點.
Pointcut(切入點):所謂切入點是指我們要對哪些 Joinpoint 進行攔截的定義.
Advice(通知/增強):所謂通知是指攔截到 Joinpoint 之後所要做的事情就是通知.通知分為前置通知,後置
通知,異常通知,最終通知,環繞通知(切面要完成的功能)
Introduction(引介):引介是一種特殊的通知在不修改類代碼的前提下, Introduction 可以在運行期為類
動態地添加一些方法或 Field.
Target(目標對象):代理的目標對象
Weaving(織入):是指把增強應用到目標對象來創建新的代理對象的過程.
spring 採用動態代理織入,而 AspectJ 採用編譯期織入和類裝在期織入
Proxy(代理):一個類被 AOP 織入增強后,就產生一個結果代理類
Aspect(切面): 是切入點和通知(引介)的結合

基於專業的角度實例分析(相對來說易理解,什麼?畫質差?咳咳…1080p藍光畫質…哎哎哎..大哥..別打…別打…別打臉):

2.2引入相應的 jar 包

引入jar包:基礎六個jar包、AOP聯盟jar包、spring的AOPjar包、aspectJ的jar包、spring整合aspectj的jar包

  • spring 的傳統 AOP 的開發的包
    spring-aop-4.2.4.RELEASE.jar
    com.springsource.org.aopalliance-1.0.0.jar

  • aspectJ 的開發包:
    com.springsource.org.aspectj.weaver-1.6.8.RELEASE.jar
    spring-aspects-4.2.4.RELEASE.jar

    2.3 引入 Spring 的配置文件

    引入 AOP 約束:

 <beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop" xsi:schemaLocation="
http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop 
http://www.springframework.org/schema/aop/spring-aop.xsd">
</beans>

2.4 編寫目標類

創建接口和類:

    public interface OrderDao {
        public void save();

        public void update();

        public void delete();

        public void find();
    }

    public class OrderDaoImpl implements OrderDao {
        @Override
        public void save() {
            System.out.println("保存訂單...");
        }

        @Override
        public void update() {
            System.out.println("修改訂單...");
        }

        @Override
        public void delete() {
            System.out.println("刪除訂單...");
        }

        @Override
        public void find() {
            System.out.println("查詢訂單...");
        }
    }

2.5 目標類的XML配置

<!-- 目標類配置:被增強的類 --> 
<bean id="orderDao" class="com.gx.spring.demo3.OrderDaoImpl"></bean>

2.6 整合 Junit 單元測試

前提:引入 spring-test.jar 測試的jar包,整合 Junit 單元測試之後就不需要每次都重複註冊工廠,只要固定格式在測試類上寫兩個註解,需要的屬性直接注入,之後只關心自己的測試類即可

//固定註解寫法(前提:引入 spring-test.jar 測試的jar包)
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration("classpath:applicationContext.xml")
    public class SpringDemo3 {
        @Resource(name = "orderDao")  //需要的屬性直接注入(前提:引入 spring-test.jar 測試的jar包)
        private OrderDao orderDao;

        @Test
        public void demo1() {
            orderDao.save();
            orderDao.update();
            orderDao.delete();
            orderDao.find();
        }
    }

運行demo出現如下效果:

2.7 通知類型

到這裏,就需要需要對通知類型了解一下(前三者常用):

前置通知 :在目標方法執行之前執行.

後置通知 :在目標方法執行之後執行

如果要獲得後置通知中的返回值,必須注意的是:

環繞通知 :在目標方法執行前和執行后執行

異常拋出通知:在目標方法執行出現 異常的時候 執行
最終通知 :無論目標方法是否出現異常 最終通知都會 執行.

通知類型XML配置

2.8 切入點表達式

execution(表達式)

表達式 : [方法訪問修飾符] 方法返回值 包名.類名.方法名(方法的參數)

切入點表達式所以就是execution( [方法訪問修飾符] 方法返回值 包名.類名.方法名(方法的參數))

其中 [ ] 中的方法訪問修飾符可有可無

切入點表達式各類型例子:

public * com.gx.spring.dao. * .*(..)
com.gx.spring.dao.*.*(..)
com.gx.spring.dao.UserDao+.*(..)
com.gx.spring.dao..*.*(..)

2.9 編寫一個切面類

好了,了解了通知類型以及切入點表達式之後就可以來 編寫一個切面類玩起來了QAQ

public class MyAspectXml {
    // 前置增強
    public void before(){
       System.out.println("前置增強===========");
} }

2.10 配置完成增強

<!-- 配置切面類 --> 
<bean id="myAspectXml" class="com.gx.spring.demo3.MyAspectXml"></bean>
<!-- 進行 aop 的配置 --> 
<aop:config>
<!-- 配置切入點表達式:哪些類的哪些方法需要進行增強 -->
 <aop:pointcut expression="execution(* com.gx.spring.demo3.OrderDao.save(..))" id="pointcut1"/>
<!-- 配置切面 --> 
<aop:aspect ref="myAspectXml"> 
    <aop:before method="before" pointcut-ref="pointcut1"/>
</aop:aspect>
</aop:config>

需要注意的點我都規劃出來了(不用誇我,我知道我長得帥QnQ)

2.11 其他的增強的配置:

<!-- 配置切面類 -->
 <bean id="myAspectXml" class="com.gx.demo3.MyAspectXml"></bean>
    <!-- 進行 aop 的配置 -->
 <aop:config>
    <!-- 配置切入點表達式:哪些類的哪些方法需要進行增強 -->
     <aop:pointcut expression="execution(* com.gx.spring.demo3.*Dao.save(..))" id="pointcut1"/>
     <aop:pointcut expression="execution(* com.gx.spring.demo3.*Dao.delete(..))" id="pointcut2"/>
     <aop:pointcut expression="execution(* com.gx.spring.demo3.*Dao.update(..))" id="pointcut3"/>
     <aop:pointcut expression="execution(* com.gx.spring.demo3.*Dao.find(..))" id="pointcut4"/>
    <!-- 配置切面 --> 
    <aop:aspect ref="myAspectXml">
       <aop:before method="before" pointcut-ref="pointcut1"/>
       <aop:after-returning method="afterReturing"pointcut-ref="pointcut2"/>
       <aop:around method="around" pointcut-ref="pointcut3"/>
       <aop:after-throwing method="afterThrowing" pointcut-ref="pointcut4"/>
       <aop:after method="after" pointcut-ref="pointcut4"/>
    </aop:aspect>
</aop:config>

3、Spring 基於AspectJ 進行 AOP 的開發入門(註解的方式):

3.1創建項目,引入jar包

引入的jar包如下:

3.2引入配置文件

3.3編寫目標類並配置

編寫目標類:

package com.gx.spring.demo1;

public class OrderDao {

    public void save(){
        System.out.println("保存訂單...");
    }
    public void update(){
        System.out.println("修改訂單...");
    }
    public String delete(){
        System.out.println("刪除訂單...");
        return "鄢寒";
    }
    public void find(){
        System.out.println("查詢訂單...");
    }
}

XML配置:

<!-- 配置目標類 -->
    <bean id="orderDao" class="com.gx.spring.demo1.OrderDao">

    </bean>

3.4編寫切面類並配置

編寫切面類

package com.gx.spring.demo1;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;

/**
 * 切面類:註解的切面類
 * @author jt
 */
public class MyAspectAnno {

    public void before(){
        System.out.println("前置增強===========");
    }
}

XML配置:

<!-- 配置切面類 -->
    <bean id="myAspect" class="com.gx.spring.demo1.MyAspectAnno">
    
    </bean>

3.5使用註解的AOP對象目標類進行增強

1、在配置文件中打開註解的AOP開發

<!-- 在配置文件中開啟註解的AOP的開發 -->
    <aop:aspectj-autoproxy/>

2、在切面類上使用註解
在類上使用@Aspect註解代表這是一個切面類
在方法上注入屬性@Before(execution表達式)代表前置增強

@Aspect
public class MyAspectAnno {

    @Before(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public void before(){
        System.out.println("前置增強===========");
    }
}

3.6編寫測試類

package com.gx.spring.demo1;

import javax.annotation.Resource;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

/**
 * Spring的AOP的註解開發
 *
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("classpath:applicationContext.xml")
public class SpringDemo1 {
    @Resource(name="orderDao")
    private static OrderDao orderDao;
    
    public static void main(String[] args) {
        
            orderDao.save();
            orderDao.update();
            orderDao.delete();
            orderDao.find();
        
    }
    
}

測試結果:

4、Spring的註解的AOP的通知類型

4.1@Before :前置通知

@Aspect
public class MyAspectAnno {

    @Before(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public void before(){
        System.out.println("前置增強===========");
    }
}

4.2@AfterReturning :後置通知

後置通知可以獲取方法返回值

// 後置通知:
    @AfterReturning(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public void afterReturning(Object result){
        System.out.println("後置增強==========="+result);
    }

借用一下XML方式的圖,意思意思啦,意思還是那個意思QnQ

4.3@Around :環繞通知

// 環繞通知:
    @Around(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable{
        System.out.println("環繞前增強==========");
        Object obj  = joinPoint.proceed();
        System.out.println("環繞后增強==========");
        return obj;
    }

4.4@AfterThrowing :異常拋出通知

測試前記得製造出個異常qnq

// 異常拋出通知:
    @AfterThrowing(value="execution(* com.gx.spring.demo1.OrderDao.save(..))" throwing="e")
    public void afterThrowing(Throwable e){
        System.out.println("異常拋出增強========="+e.getMessage());
    }

4.5@After :最終通知

// 最終通知
    @After(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    public void after(){
        System.out.println("最終增強============");
    }

5、Spring的註解的AOP的切入點的配置

首先,我們發現在Spring 基於AspectJ 進行 AOP 的開發入門(註解的方式)的過程中如果方法過多,通知過多並且作用於一個方法,需求一改變就需要更改相應的源代碼,為了更好的維護,於是有了AOP的切入點的配置,AOP的切入點的配置能很好地決絕改問題!只需要管理AOP的切入點的配置即可!

具體代碼如下:

package com.gx.spring.demo1;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;

/**
 * 切面類:註解的切面類
 * @author jt
 */
@Aspect
public class MyAspectAnno {
    // 前置通知:
    @Before(value="MyAspectAnno.pointcut2()")
    public void before(){
        System.out.println("前置增強===========");
    }
    
    // 後置通知:
    @AfterReturning(value="MyAspectAnno.pointcut4()",returning="result")
    public void afterReturning(Object result){
        System.out.println("後置增強==========="+result);
    }
    
    // 環繞通知:
    @Around(value="MyAspectAnno.pointcut3()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable{
        System.out.println("環繞前增強==========");
        Object obj  = joinPoint.proceed();
        System.out.println("環繞后增強==========");
        return obj;
    }
    
    // 異常拋出通知:
    @AfterThrowing(value="MyAspectAnno.pointcut1()",throwing="e")
    public void afterThrowing(Throwable e){
        System.out.println("異常拋出增強========="+e.getMessage());
    }
    
    // 最終通知
    @After(value="MyAspectAnno.pointcut1()")
    public void after(){
        System.out.println("最終增強============");
    }
    
    // 切入點註解:
    @Pointcut(value="execution(* com.gx.spring.demo1.OrderDao.find(..))")
    private void pointcut1(){}
    @Pointcut(value="execution(* com.gx.spring.demo1.OrderDao.save(..))")
    private void pointcut2(){}
    @Pointcut(value="execution(* com.gx.spring.demo1.OrderDao.update(..))")
    private void pointcut3(){}
    @Pointcut(value="execution(* com.gx.spring.demo1.OrderDao.delete(..))")
    private void pointcut4(){}
}

如果本文對你有一點點幫助,那麼請點個讚唄,謝謝~

最後,若有不足或者不正之處,歡迎指正批評,感激不盡!如果有疑問歡迎留言,絕對第一時間回復!

歡迎各位關注我的公眾號,一起探討技術,嚮往技術,追求技術,說好了來了就是盆友喔…

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品在網路上成為最夯、最多人討論的話題?

※高價收購3C產品,價格不怕你比較

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

中國動力電池技術突破,2025 年電動車成本效益比料勝燃油車

 

21 世紀經濟報導,中國電動汽車百人會理事長陳清泰表示,過去一年,中國電動汽車產業正在向高品質發輾轉型,發展形勢良好;而電動汽車再往前發展要跨越一個臨界點,就是電動車的成本效益比達到和超過燃油車,他預期這個臨界點會可能會在 2025 年前後出現。   中國 2017 年新能源汽車銷量目標 70 萬輛,去年 1 到 11 月累計銷售 60.9 萬輛、年增 51.4%。專家預測,中國去年新能源汽車總銷量可能超過 80 萬輛。中國汽車工業協會秘書長助理許海東日前表示,中國去年新能源汽車 70 萬輛銷量目標應可達成,2018 年新能源車銷量增速仍可保持 40% 至 50%,預期銷量將超過 100 萬輛。   陳清泰指出,電動汽車再往前發展要跨越一個臨界點,就是電動車的成本效益比達到和超過燃油車,如果跨過了這個門檻,電動車就可依託市場力量自主發展了,目前仍需靠政策、靠政府補貼。他亦預期,當財政補貼完全取消後,雙積分政策作為替代政策,新能源汽車積分比例將在 2020 年 12% 的基礎上繼續上調。   對於上述臨界點會出現在什麼時候?陳清泰的判斷是,大約在 2025 年前後。對此,他建議中國車企要在以下幾個方面做好準備,首先是在財政補貼退坡之後,做好可持續發展的保障工作,另外電動車自身要透過輕量化、節能化提高成本效益比;其次,產品技術要雙線作戰,其中一條戰線是完善汽車的行駛功能,另一條戰線則是將車聯網和共享化應用到新能源汽車上;第三,自動駕駛是爭奪未來的一個重點;第四,在有限的時間要加緊做品牌建設。   中國電動汽車百人會執行副理事長歐陽明高表示,2017 年中國動力電池技術已取得實質性進展,動力電池系統能量密度已達到 150 瓦時/公斤甚至以上,鋰離子動力電池單體比能量有望於 2020 年前實現 300 瓦時/公斤目標。   他進一步指出,2025 年,具備一般成本效益比的純電動轎車合理的里程是 300 到 400 公里;但 2030 年,最大的技術突破將體現在電解質上,固態電池會規模產業化,電池單體比能量可望觸及 500 瓦時/公斤;2030 年常規的成本效益比車型,續航里程應該可達到 500 公里以上。   (本文內容由授權使用。首圖來源:)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

平板收購,iphone手機收購,二手筆電回收,二手iphone收購-全台皆可收購

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

一文帶你深入了解 redis 複製技術及主從架構

主從架構可以說是互聯網必備的架構了,第一是為了保證服務的高可用,第二是為了實現讀寫分離,你可能熟悉我們常用的 MySQL 數據庫的主從架構,對於我們 redis 來說也不意外,redis 數據庫也有各種各樣的主從架構方式,在主從架構中會涉及到主節點與從節點之間的數據同步,這個數據同步的過程在 redis 中叫做複製,這在篇文章中,我們詳細的聊一聊 redis 的複製技術和主從架構 ,本文主要有以下內容:

  • 主從架構環境搭建
    • 主從架構的建立方式
    • 主從架構的斷開
  • 複製技術的原理
    • 數據同步過程
    • 心跳檢測
  • 主從拓撲架構
    • 一主一從
    • 一主多從
    • 樹狀結構

主從環境搭建

redis 的實例在默認的情況下都是主節點,所以我們需要修改一些配置來搭建主從架構,redis 的主從架構搭建還是比較簡單的,redis 提供了三種方式來搭建主從架構,在後面我們將就介紹,在介紹之前我們要先了解主從架構的特性:在主從架構中有一個主節點(master)和最少一個從節點(slave),並且數據複製是單向的,只能從主節點複製到從節點,不能由從節點到主節點。

主從架構的建立方式

主從架構的建立有以下三種方式:

  • 在 Redis.conf 配置文件中加入 slaveof {masterHost} {masterPort} 命令,隨 Redis 實例的啟動生效
  • 在 redis-server 啟動命令后加入 –slaveof {masterHost} {masterPort} 參數
  • 在 redis-cli 交互窗口下直接使用命令:slaveof {masterHost} {masterPort}

上面三種方式都可以搭建 Redis 主從架構,我們以第一種方式來演示,其他兩種方式自行嘗試,由於是演示,所以就在本地啟動兩個 Redis 實例,並不在多台機器上啟動 redis 的實例了,我們準備一個端口 6379 的主節點實例,準備一個端口 6480 從節點的實例,端口 6480 的 redis 實例配置文件取名為 6480.conf 並且在裏面添加 slaveof 語句,在配置文件最後加入如下一條語句

slaveof 127.0.0.1 6379

分別啟動兩個 redis 實例,啟動之後他們會自動建立主從關係,關於這背後的原理,我們後面在詳細的聊一聊,先來驗證一下我們的主從架構是否搭建成功,我們先在 6379 master 節點上新增一條數據:

然後再 6480 slave 節點上獲取該數據:

可以看出我們在 slave 節點上已經成功的獲取到了在 master 節點新增的值,說明主從架構已經搭建成功了,我們使用 info replication 命令來查看兩個節點的信息,先來看看主節點的信息

可以看出 6379 端口的實例 role 為 master,有一個正在連接的實例,還有其他運行的信息,我們再來看看 6480 端口的 redis 實例信息

可以看出兩個節點之間相互記錄著對象的信息,這些信息在數據複製時候將會用到。在這裡有一點需要說明一下,默認情況下 slave 節點是只讀的,並不支持寫入,也不建議開啟寫入,我們可以驗證一下,在 6480 實例上寫入一條數據

127.0.0.1:6480> set x 3
(error) READONLY You can't write against a read only replica.
127.0.0.1:6480> 

提示只讀,並不支持寫入操作,當然我們也可以修改該配置,在配置文件中 replica-read-only yes 配置項就是用來控制從服務器只讀的,為什麼只能只讀?因為我們知道複製是單向的,數據只能由 master 到 slave 節點,如果在 salve 節點上開啟寫入的話,那麼修改了 slave 節點的數據, master 節點是感知不到的,slave 節點的數據並不能複製到 master 節點上,這樣就會造成數據不一致的情況,所以建議 slave 節點只讀

主從架構的斷開

主從架構的斷開同樣是 slaveof 命令,在從節點上執行 slaveof no one 命令就可以與主節點斷開追隨關係,我們在 6480 節點上執行 slaveof no one 命令

127.0.0.1:6480> slaveof no one
OK
127.0.0.1:6480> info replication
# Replication
role:master
connected_slaves:0
master_replid:a54f3ba841c67762d6c1e33456c97b94c62f6ac0
master_replid2:e5c1ab2a68064690aebef4bd2bd4f3ddfba9cc27
master_repl_offset:4367
second_repl_offset:4368
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:4367
127.0.0.1:6480> 

執行完 slaveof no one 命令之後,6480 節點的角色立馬恢復成了 master ,我們再來看看時候還和 6379 實例連接在一起,我們在 6379 節點上新增一個 key-value

127.0.0.1:6379> set y 3
OK

在 6480 節點上 get y

127.0.0.1:6480> get y
(nil)
127.0.0.1:6480> 

在 6480 節點上獲取不到 y ,因為 6480 節點已經跟 6379 節點斷開的聯繫,不存在主從關係了,slaveof 命令不僅能夠斷開連接,還能切換主服務器,使用命令為 slaveof {newMasterIp} {newMasterPort},我們讓 6379 成為 6480 的從節點, 在 6379 節點上執行 slaveof 127.0.0.1 6480 命令,我們在來看看 6379 的 info replication

127.0.0.1:6379> info replication
# Replication
role:slave
master_host:127.0.0.1
master_port:6480
master_link_status:up
master_last_io_seconds_ago:2
master_sync_in_progress:0
slave_repl_offset:4367
slave_priority:100
slave_read_only:1
connected_slaves:0
master_replid:99624d4b402b5091552b9cb3dd9a793a3005e2ea
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:4367
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:4368
repl_backlog_histlen:0
127.0.0.1:6379> 

6379 節點的角色已經是 slave 了,並且主節點的是 6480 ,我們可以再看看 6480 節點的 info replication

127.0.0.1:6480> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6379,state=online,offset=4479,lag=1
master_replid:99624d4b402b5091552b9cb3dd9a793a3005e2ea
master_replid2:a54f3ba841c67762d6c1e33456c97b94c62f6ac0
master_repl_offset:4479
second_repl_offset:4368
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:4479
127.0.0.1:6480> 

在 6480 節點上有 6379 從節點的信息,可以看出 slaveof 命令已經幫我們完成了主服務器的切換。

複製技術的原理

redis 的主從架構好像很簡單一樣,我們就執行了一條命令就成功搭建了主從架構,並且數據複製也沒有問題,使用起來確實簡單,但是這背後 redis 還是幫我們做了很多的事情,比如主從服務器之間的數據同步、主從服務器的狀態檢測等,這背後 redis 是如何實現的呢?接下來我們就一起看看

數據複製原理

我們執行完 slaveof 命令之後,我們的主從關係就建立好了,在這個過程中, master 服務器與 slave 服務器之間需要經歷多個步驟,如下圖所示:

slaveof 命令背後,主從服務器大致經歷了七步,其中權限驗證這一步不是必須的,為了能夠更好的理解這些步驟,就以我們上面搭建的 redis 實例為例來詳細聊一聊各步驟。

1、保存主節點信息

在 6480 的客戶端向 6480 節點服務器發送 slaveof 127.0.0.1 6379 命令時,我們會立馬得到一個 OK

127.0.0.1:6480> slaveof 127.0.0.1 6379
OK
127.0.0.1:6480> 

這時候數據複製工作並沒有開始,數據複製工作是在返回 OK 之後才開始執行的,這時候 6480 從節點做的事情是將給定的主服務器 IP 地址 127.0.0.1 以及端口 6379 保存到服務器狀態的 masterhost 屬性和 masterport 屬性裏面

2、建立 socket 連接

在 slaveof 命令執行完之後,從服務器會根據命令設置的 IP 地址和端口,跟主服務器創建套接字連接, 如果從服務器能夠跟主服務器成功的建立 socket 連接,那麼從服務器將會為這個 socket 關聯一個專門用於處理複製工作的文件事件處理器,這個處理器將負責後續的複製工作,比如接受全量複製的 RDB 文件以及服務器傳來的寫命令。同樣主服務器在接受從服務器的 socket 連接之後,將為該 socket 創建一個客戶端狀態,這時候的從服務器同時具有服務器和客戶端兩個身份,從服務器可以向主服務器發送命令請求而主服務器則會向從服務器返回命令回復。

3、發送 ping 命令

從服務器與主服務器連接成功后,做的第一件事情就是向主服務器發送一個 ping 命令,發送 ping 命令主要有以下目的:

  • 檢測主從之間網絡套接字是否可用
  • 檢測主節點當前是否可接受處理命令

在發送 ping 命令之後,正常情況下主服務器會返回 pong 命令,接受到主服務器返回的 pong 回復之後就會進行下一步工作,如果沒有收到主節點的 pong 回復或者超時,比如網絡超時或者主節點正在阻塞無法響應命令,從服務器會斷開複製連接,等待下一次定時任務的調度。

4、身份驗證

從服務器在接收到主服務器返回的 pong 回復之後,下一步要做的事情就是根據配置信息決定是否需要身份驗證:

  • 如果從服務器設置了 masterauth 參數,則進行身份驗證
  • 如果從服務器沒有設置 masterauth 參數,則不進行身份驗證

在需要身份驗證的情況下,從服務器將就向主服務器發送一條 auth 命令,命令參數為從服務器 masterauth 選項的值,舉個例子,如果從服務器的配置里將 masterauth 參數設置為:123456,那麼從服務器將向主服務器發送 auth 123456 命令,身份驗證的過程也不是一帆風順的,可能會遇到以下幾種情況:

  • 從服務器通過 auth 命令發送的密碼與主服務器的 requirepass 參數值一致,那麼將繼續進行後續操作,如果密碼不一致,主服務將返回一個 invalid password 錯誤
  • 如果主服務器沒有設置 requirepass 參數,那麼主服務器將返回一個 no password is set 錯誤

所有的錯誤情況都會令從服務器中止當前的複製工作,並且要從建立 socket 開始重新發起複制流程,直到身份驗證通過或者從服務器放棄執行複製為止

5、發送端口信息

在身份驗證通過後,從服務器將執行 REPLCONF listening 命令,向主服務器發送從服務器的監聽端口號,例如在我們的例子中從服務器監聽的端口為 6480,那麼從服務器將向主服務器發送 REPLCONF listening 6480 命令,主服務器接收到這個命令之後,會將端口號記錄在從服務器所對應的客戶端狀態的 slave_listening_port 屬性了,也就是我們在 master 服務器的 info replication 裏面看到的 port 值。

6、數據複製

數據複製是最複雜的一塊了,由 psync 命令來完成,從服務器會向主服務器發送一個 psync 命令來進行數據同步,在 redis 2.8 版本以前使用的是 sync 命令,除了命令不同之外,在複製的方式上也有很大的不同,在 redis 2.8 版本以前使用的都是全量複製,這對主節點和網絡會造成很大的開銷,在 redis 2.8 版本以後,數據同步將分為全量同步和部分同步。

  • 全量複製:一般用於初次複製場景,不管是新舊版本的 redis 在從服務器第一次與主服務連接時都將進行一次全量複製,它會把主節點的全部數據一次性發給從節點,當數據較大時,會對主節點和網絡造成很大的開銷,redis 的早期版本只支持全量複製,這不是一種高效的數據複製方式

  • 部分複製:用於處理在主從複製中因網絡閃斷等原因造成的數據丟失 場景,當從節點再次連上主節點后,如果條件允許,主節點會補發丟失數據 給從節點。因為補發的數據遠遠小於全量數據,可以有效避免全量複製的過高開銷,部分複製是對老版複製的重大優化,有效避免了不必要的全量複製操作

redis 之所以能夠支持全量複製和部分複製,主要是對 sync 命令的優化,在 redis 2.8 版本以後使用的是一個全新的 psync 命令,命令格式為:psync {runId} {offset},這兩個參數的意義:

  • runId:主節點運行的id
  • offset:當前從節點複製的數據偏移量

也許你對上面的 runid、offset 比較陌生,沒關係,我們先來看看下面三個概念:

1、複製偏移量

參与複製的主從節點都會分別維護自身複製偏移量:主服務器每次向從服務器傳播 N 個字節的數據時,就將自己的偏移量的值加上 N,從服務器每次接收到主服務器傳播的 N個字節的數據時,將自己的偏移量值加上 N。通過對比主從服務器的複製偏移量,就可以知道主從服務器的數據是否一致,如果主從服務器的偏移量總是相同,那麼主從數據一致,相反,如果主從服務器兩個的偏移量並不相同,那麼說明主從服務器並未處於數據一致的狀態,比如在有多個從服務器時,在傳輸的過程中某一個服務器離線了,如下圖所示:

由於從服務器A 在數據傳輸時,由於網絡原因掉線了,導致偏移量與主服務器不一致,那麼當從服務器A 重啟並且與主服務器連接成功后,重新向主服務器發送 psync 命令,這時候數據複製應該執行全量複製還是部分複製呢?如果執行部分複製,主服務器又如何補償從服務器A 在斷線期間丟失的那部分數據呢?這些問題的答案都在複製積壓緩衝區裏面

2、複製積壓緩衝區

複製積壓緩衝區是保存在主節點上的一個固定長度的隊列,默認大小為 1MB,當主節點有連接的從節點(slave)時被創建,這時主節點(master) 響應寫命令時,不但會把命令發送給從節點,還會寫入複製積壓緩衝區,如下圖所示:

因此,主服務器的複製積壓緩衝區裏面會保存着一部分最近傳播的寫命令,並且複製積壓緩衝區會為隊列中的每個字節記錄相應的複製偏移量。所以當從服務器重新連上主服務器時,從服務器通過 psync 命令將自己的複製偏移量 offset 發送給主服務器,主服務器會根據這個複製偏移量來決定對從服務器執行何種數據同步操作:

  • 如果從服務器的複製偏移量之後的數據仍然存在於複製積壓緩衝區裏面,那麼主服務器將對從服務器執行部分複製操作
  • 如果從服務器的複製偏移量之後的數據不存在於複製積壓緩衝區裏面,那麼主服務器將對從服務器執行全量複製操作

3、服務器運行ID

每個 Redis 節點啟動后都會動態分配一個 40 位的十六進制字符串作為運行 ID,運行 ID 的主要作用是用來唯一識別 Redis 節點,我們可以使用 info server 命令來查看

127.0.0.1:6379> info server
# Server
redis_version:5.0.5
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:2ef1d58592147923
redis_mode:standalone
os:Linux 3.10.0-957.27.2.el7.x86_64 x86_64
arch_bits:64
multiplexing_api:epoll
atomicvar_api:atomic-builtin
gcc_version:4.8.5
process_id:25214
run_id:7b987673dfb4dfc10dd8d65b9a198e239d20d2b1
tcp_port:6379
uptime_in_seconds:14382
uptime_in_days:0
hz:10
configured_hz:10
lru_clock:14554933
executable:/usr/local/redis-5.0.5/src/./redis-server
config_file:/usr/local/redis-5.0.5/redis.conf
127.0.0.1:6379> 

這裏面有一個run_id 字段就是服務器運行的ID

了解這幾個概念之後,我們一起來看看 psync 命令的運行流程,psync 命令運行流程如下圖所示:

psync 命令的邏輯比較簡單,整個流程分為兩步:

1、從節點發送 psync 命令給主節點,參數 runId 是當前從節點保存的主節點運行ID,參數offset是當前從節點保存的複製偏移量,如果是第一次參与複製則默認值為 -1。

2、主節點接收到 psync 命令之後,會向從服務器返回以下三種回復中的一種:

  • 回復 +FULLRESYNC {runId} {offset}:表示主服務器將與從服務器執行一次全量複製操作,其中 runid 是這個主服務器的運行 id,從服務器會保存這個id,在下一次發送 psync 命令時使用,而 offset 則是主服務器當前的複製偏移量,從服務器會將這個值作為自己的初始化偏移量
  • 回復 +CONTINUE:那麼表示主服務器與從服務器將執行部分複製操作,從服務器只要等着主服務器將自己缺少的那部分數據發送過來就可以了
  • 回復 +ERR:那麼表示主服務器的版本低於 redis 2.8,它識別不了 psync 命令,從服務器將向主服務器發送 sync 命令,並與主服務器執行全量複製

7、命令持續複製

當主節點把當前的數據同步給從節點后,便完成了複製的建立流程。但是主從服務器並不會斷開連接,因為接下來主節點會持續地把寫命令發送給從節點,保證主從數據一致性。

經過上面 7 步就完成了主從服務器之間的數據同步,由於這篇文章的篇幅比較長,關於全量複製和部分複製的細節就不介紹了,全量複製就是將主節點的當前的數據生產 RDB 文件,發送給從服務器,從服務器再從本地磁盤加載,這樣當文件過大時就需要特別大的網絡開銷,不然由於數據傳輸比較慢會導致主從數據延時較大,部分複製就是主服務器將複製積壓緩衝區的寫命令直接發送給從服務器。

心跳檢測

心跳檢測是發生在主從節點在建立複製后,它們之間維護着長連接並彼此發送心跳命令,便以後續持續發送寫命令,主從心跳檢測如下圖所示:

主從節點彼此都有心跳檢測機制,各自模擬成對方的客戶端進行通信,主從心跳檢測的規則如下:

  • 主節點默認每隔 10 秒對從節點發送 ping 命令,判斷從節點的存活性和連接狀態。可通過修改 redis.conf 配置文件裏面的 repl-ping-replica-period 參數來控制發送頻率
  • 從節點在主線程中每隔 1 秒發送 replconf ack {offset} 命令,給主節點 上報自身當前的複製偏移量,這條命令除了檢測主從節點網絡之外,還通過發送複製偏移量來保證主從的數據一致

主節點根據 replconf 命令判斷從節點超時時間,體現在 info replication 統 計中的 lag 信息中,我們在主服務器上執行 info replication 命令:

127.0.0.1:6379> info replication
# Replication
role:master
connected_slaves:1
slave0:ip=127.0.0.1,port=6480,state=online,offset=25774,lag=0
master_replid:c62b6621e3acac55d122556a94f92d8679d93ea0
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:25774
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:25774
127.0.0.1:6379> 

可以看出 slave0 字段的值最後面有一個 lag,lag 表示與從節點最後一次通信延遲的秒數,正常延遲應該在 0 和 1 之間。如果超過 repl-timeout 配置的值(默認60秒),則判定從節點下線並斷開複製客戶端連接,如果從節點重新恢復,心跳檢測會繼續進行。

主從拓撲架構

Redis的主從拓撲結構可以支持單層或多層複製關係,根據拓撲複雜性可以分為以下三種:一主一從、一主多從、樹狀主從架構

一主一從結構

一主一從結構是最簡單的複製拓撲結構,我們前面搭建的就是一主一從的架構,架構如圖所示:

一主一從架構用於主節點出現宕機時從節點 提供故障轉移支持,當應用寫命令併發量較高且需要持久化時,可以只在從節點上開啟 AOF,這樣既保證數據安全性同時也避免了持久化對主節點的性能干擾。但是這裡有一個坑,需要你注意,就是當主節點關閉持久化功能時, 如果主節點脫機要避免自動重啟操作。因為主節點之前沒有開啟持久化功能自動重啟后數據集為空,這時從節點如果繼續複製主節點會導致從節點數據也被清空的情況,喪失了持久化的意義。安全的做法是在從節點上執行 slaveof no one 斷開與主節點的複製關係,再重啟主節點從而避免這一問題

一主多從架構

一主多從架構又稱為星形拓撲結構,一主多從架構如下圖所示:

一主多從架構可以實現讀寫分離來減輕主服務器的壓力,對於讀佔比較大的場景,可以把讀命令發送到 從節點來分擔主節點壓力。同時在日常開發中如果需要執行一些比較耗時的讀命令,如:keys、sort等,可以在其中一台從節點上執行,防止慢查詢對主節點造成阻塞從而影響線上服務的穩定性。對於寫併發量較高的場景,多個從節點會導致主節點寫命令的多次發送從而過度消耗網絡帶寬,同時也加重了主節點的負載影響服務穩定性。

樹狀主從架構

樹狀主從架構又稱為樹狀拓撲架構,樹狀主從架構如下圖所示:

樹狀主從架構使得從節點不但可以複製主節 數據,同時可以作為其他從節點的主節點繼續向下層複製。解決了一主多從架構中的不足,通過引入複製中 間層,可以有效降低主節點負載和需要傳送給從節點的數據量。如架構圖中,數據寫入節點A 後會同步到 B 和 C節點,B節點再把數據同步到 D 和 E節點,數據實現了一層一層的向下複製。當主節點需要掛載多個從節點時為了避免對主節點的性能干擾,可以採用樹狀主從結構降低主節點壓力。

最後

目前互聯網上很多大佬都有 Redis 系列教程,如有雷同,請多多包涵了。原創不易,碼字不易,還希望大家多多支持。若文中有所錯誤之處,還望提出,謝謝。

歡迎掃碼關注微信公眾號:「平頭哥的技術博文」,和平頭哥一起學習,一起進步。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品在網路上成為最夯、最多人討論的話題?

※高價收購3C產品,價格不怕你比較

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!