垃圾也「三倍」? 研究:各國減塑政策來不及 2040年入海塑膠垃圾將增三倍

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※教你寫出一流的銷售文案?

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※回頭車貨運收費標準

※別再煩惱如何寫文案,掌握八大原則!

※超省錢租車方案

※產品缺大量曝光嗎?你需要的是一流包裝設計!

蝴蝶效應!中日韓梅雨季重災情 歸因於北極異常高溫

摘錄自2020年8月4日自由時報報導

中日韓3國今年受梅雨季暴雨影響,接連發生重大災情,南韓媒體引述專家說法稱,今年梅雨季的強降雨災情與全球暖化下的北極異常高溫有關。

根據《韓聯社》報導,今年梅雨季比起往年為長,中國華中地區、長江流域爆發劇烈洪災,日本九州暴雨重災,南韓強降雨亦造成死亡災情,其根本原因就是全球持續暖化。

《韓聯社》引述專家所言指出,在全球暖化下,北極氣溫大幅升高導致積雪及冰層融化,暴露出的黃褐色地表加劇從陽光吸取熱能,讓當地空氣溫度飆升。在暖空氣影響下,原本從東向西流動的冷空氣轉而朝南北方向作用,進而移動至中日韓等國。

南韓氣象廳則認為,今年梅雨季偏長因素雖不能僅歸因在全球暖化,但確實是氣候變化造成。

氣候變遷
土地利用
國際新聞
中國
日本
韓國
北極
極端高溫
全球暖化
災害

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※超省錢租車方案

※別再煩惱如何寫文案,掌握八大原則!

※回頭車貨運收費標準

※教你寫出一流的銷售文案?

※產品缺大量曝光嗎?你需要的是一流包裝設計!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

垃圾危害海洋 日生研發魚不愛吃塑膠袋

摘錄自2020年8月3日公視報導

有一群日本高中生研發出可以自動分解,而且魚類不小心吃下去,也會吐掉的塑膠袋;預計再過一兩年,就能夠上市使用。

神奈川縣川崎市的「洗足學園高中」,有學生團體突發奇想,決定研發能在大自然中自行分解,卻又不會被魚類跟海龜吞食的塑膠袋,於是用「苯甲地那銨」進行實驗。

在大學跟贊助企業的協助下,學生團隊實驗製作出含有2%到20%「苯甲地那銨」的可自動分解塑膠袋。實驗後發現,只要有「苯甲地那銨」4%含量,就會讓魚類在吃進塑膠時,因為苦味而吐出。

目前這款塑膠袋,以能在海洋分解的材質製造,只要進入海洋,到完全自然分解,只需要1到2年時間。目前學生團隊,還在研究新成分,期盼降低製造費用,以便廣為商家使用。另外也將持續改善,塑膠袋完全分解前,苦味可能已經消失的缺點,進行修正。

海洋
污染治理
國際新聞
日本
海洋垃圾
海洋生物
廢棄物

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

美南加州森火 毀約307座大安公園面積

摘錄自2020年8月3日公視報導

美國南加州、洛杉磯東邊櫻桃谷的森林大火,上星期五在聖伯蒂納市附近爆發後持續延燒,四天過去消防員只控制住12%的火勢,在風勢助長下,延燒面積已來到80平方公里,相當於燒掉307座大安森林公園。加州動員超過1300名消防員,從地面跟空中兵分兩路打火,當地至少2600戶住家,將近7800位居民緊急撤離,所幸目前還沒傳出人員傷亡。

由於當地氣候相當炎熱乾燥,最高溫可能達到攝氏41度,預料火勢還將持續擴大,至於起火原因已展開調查,不排除人為刻意縱火。

至於南美洲巴西境內,全球最大的熱帶濕地系統,潘特納爾濕地,今年7月份有將近1700個地點發生大火,創歷史新高,也是去年同期的三倍。

生物多樣性
國際新聞
美國
大火
山地
森林

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

澳洲史上最慘野火 30億動物死亡、流離失所

摘錄自2020年8月3日公視報導

世界自然基金會最新的研究指出,有將近30億隻澳洲動物在先前的森林大火中喪命或是逃離家園,是原先估計的三倍。而為了拯救無尾熊,保育人員利用「紅外線無人機」,來進行追蹤。

不只是棲息地遭到破壞,無尾熊還得面臨汽車路殺、野狗攻擊、披衣菌感染等疾病,還有氣候變遷的威脅。國際自然保育聯盟IUCN已將無尾熊列入「易危物種」,澳洲6月一份國會調查報告指出,生長在澳洲東岸的無尾熊,恐怕到了2050年就會滅絕,而2019-2020年發生的澳洲野火加速了它的進程。

世界自然基金會WWF也認同,利用紅外線無人機追蹤無尾熊是一大突破。之後隨著技術提升、成本下降,將漸漸可以不分晝夜追蹤出無尾熊的蹤跡。

WWF委託澳洲數間大學的科學家研究,7月28日發佈的報告指出,2019到2020年的野火,不但是現代史上最慘烈的野火災難之一,更導致近30億隻動物死亡或是流離失所,是先前預估的三倍。其中包含1.43億隻哺乳類、24.6億隻爬蟲類、1.8億隻鳥類,以及5100萬隻蛙類。

生物多樣性
國際新聞
澳洲
大火
野火
山地
森林

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

※教你寫出一流的銷售文案?

※超省錢租車方案

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※產品缺大量曝光嗎?你需要的是一流包裝設計!

玩轉華為物聯網IoTDA服務系列三-自動售貨機銷售分析場景示例

場景簡介

       通過收集自動售貨機系統的銷售數據,EI數據分析售貨銷量狀況。

        該場景主要描述的是設備可以通過MQTT協議與物聯網平台進行交互,應用側可以到物聯網平台訂閱設備側變化的通知,用戶可以在控制台或通過應用側接口創建數據轉發規則,把設備上報的屬性轉發給其他華為雲服務。

        核心知識點:產品模型、編輯碼插件、訂閱推送、屬性上報、MQTT協議、數據轉發規則。

場景流程

流程解釋:

        

  • 1、創建自動售貨機產品:物聯網平台以產品為粒度管理批量設備。用戶可以通過平台提供的API接口或控制台創建產品。

  • 2、上傳產品模型:產品模型是定義一種設備的基本屬性和命令。產品模型可以通過控制台,也可以導入公共產品庫的模型。該場景沒有編解碼插件,是因為設備是基於安卓操作系統開發的,能夠通過MQTT協議與平台進行交互。

  • 3、批量註冊自動售貨機設備:平台提供了應用側API接口可以註冊設備,也可以通過控制台批量註冊。註冊設備時獲取的設備ID,是設備側與平台交互的唯一標識。

  • 4、創建自動售貨機設備狀態變化的訂閱:售貨管理系統可以在平台創建設備變化的通知訂閱,需要把callback url即應用回調地址傳給平台,平台後續會推送通知到該url。

  • 5、設備建鏈:MQTT設備是指通過MQTT協議,不論是集成了華為IoT Device SDK,還是原生MQTT協議接入,只要是json數據格式傳輸給平台,平台就無需使用編解碼插件。如果是二進制上傳,則需要先做編解碼插件的開發。MQTT是長連接,需要先建鏈才能進行數據傳輸,可以通過安全加密方式8883端口接入(推薦),也可以通過非安全加密方式1883端口接入。

  • 6、推送自動售貨機設備激活通知:平台會根據之前應用訂閱的回調地址,把自動售貨機設備上線的通知類型通過HTTP/HTTPS推送回去。

  • 7、創建數據轉發規則:售貨管理系統可以通過API接口創建規則,也可以通過控制台創建,指定過濾指定的屬性,給指定的通道轉發數據。

  • 8、開通DIS通道/MRS服務:華為公有雲上有豐富的SaaS服務和PaaS服務,供您結合自己的業務需要進行組合使用。DIS服務提供高效採集、傳輸、分發能力,支持多種IoT協議,可以開通該服務,通過IoTDA規則引擎,把自動售貨機設備的數據轉發給DIS,然後再利用諸如MRS服務,實現自動售貨銷量狀況數據分析。

  • 9、自動售貨機屬性上報:設備側可以通過SDK或MQTT原生協議接入平台,屬性上報銷售信息。這裏值得注意的是,設備側上報的數據,是通過屬性上報,與消息上報最大的區別在於是否經過產品模型。屬性上報的內容與格式都要跟產品模型定義保持一致。具體概念介紹可以參閱“物模型”。

  • 10、按規則數據轉發:平台收到設備上報的屬性后,規則引擎會進行過濾(不論屬性還是消息,平台都會做規則過濾),把設定好的屬性值轉發到指定的DIS通道,然後再通過DIS的接口,由MRS去消費DIS的數據,實現對銷量的分析。

最佳實踐

場景說明

物聯網解決方案中,作為數據主體的“物”可能數量會非常大,產生的數據已經無法通過傳統的數據處理服務進行處理。如何分析與利用這龐大的物聯網設備數據對物聯網企業來說又是一個新的挑戰。

華為雲物聯網平台提供規則引擎能力,支持將數據上報的數據轉發至華為雲其他雲服務,可實現將海量數據通過數據接入服務(DIS)轉發至MapReduce服務(MRS),對數據進行處理后再由數據可視化服務(DLV)讀取數據呈現為可視化報表,實現數據的一站式採集、處理和分析。

在本示例中,我們實現下述場景:

自動售貨機每次銷售商品後上報銷售商品種類、數量、時間和所屬區域到物聯網平台,物聯網平台將數據通過數據接入服務轉發至MapReduce服務,MapReduce服務處理數據並寫為統計文件,數據可視化服務從統計文件讀取數據展現為四個維度的銷售報表。

創建MapReduce集群

創建集群,用於存儲和處理DIS轉儲的數據。

  1. 登錄華為雲官方網站,訪問MapReduce服務。

  2. 單擊“立即購買”,創建集群,以下配置僅為樣例。

    注:下圖以新版自定義購買界面為例,需要在“購買集群”界面點擊右上角的“點擊體驗新版”,然後選擇“自定義購買”。

參數名稱

說明

軟件配置

當前區域

保持默認。

集群名稱

自定義或保持默認。

集群版本

保持默認。

集群類型

分析集群。組件勾選Spark,系統會自動勾選Hive和Tez。“Hive使用外部數據源存儲元數據”保持關閉。

Kerberos認證

關閉。

用戶名

固定為“admin”不可修改。

密碼

自定義。

確認密碼

硬件配置

計費模式

按實際使用需求選擇,本示例中選擇“按需計費”。

網絡配置

全部保持默認。

實例

為節省實驗費用,可修改分析Core的實例數量為1,其餘保持默認值。密碼自定義。

高級配置均保持默認。

3.集群創建成功后,等待15到30分鐘,集群狀態變更為“運行中”則表示創建成功。

創建OBS桶

  1. 登錄華為雲官方網站,訪問對象存儲服務。

  2. 單擊“管理控制台”進入對象存儲服務管理控制台。

  3. 單擊頁面右上角的“創建桶”,根據需求選擇桶規格后,單擊“立即創建”。

創建數據接入通道和轉儲任務

創建通道並配置轉儲任務,實現將設備管理服務傳入DIS的數據轉發至MRS。

  1. 登錄華為雲官方網站,訪問數據接入服務。

  2. 單擊“立即購買”,購買接入通道,以下配置僅為樣例。

參數名稱

說明

區域

保持默認。

通道名稱

自定義或保持默認。

通道類型

保持默認值“普通”。

分區數量

按需填寫。

生命周期

源數據類型

選擇“JSON”。

自動擴縮容

保持關閉。

Schema開關

高級配置

保持默認。

3.通道購買成功后,進入DIS控制台接入管理 > 通道管理”頁面。

4.單擊需要查看的通道名稱,進入所選通道的管理頁面,選擇“轉儲管理”頁簽。

5.單擊“添加轉儲任務”按鈕。

6.在彈出的“添加轉儲任務”頁面配置轉儲相關配置項。

參數名稱

說明

源數據類型

默認為通道源數據類型

轉儲服務類型

選擇“MRS”。

任務名稱

自定義,如“iot_to_mrs”。

轉儲文件格式

選擇“Text”。

MRS集群

選擇已創建成功的MRS集群。

HDFS路徑

選擇轉儲文件要存儲的路徑,建議選擇“/user”。

轉儲文件目錄

自定義轉儲文件存放的文件夾名稱,本示例中為“temp”。

偏移量

選擇“最新”。

數據轉儲周期

本示例中修改為“60”。

數據臨時桶

選擇已創建的OBS桶。

數據臨時目錄

自定義,本示例中為“temp”。

7.單擊“立即創建”。

配置設備接入服務

在設備接入服務中創建產品模型、註冊設備並設置數據轉發規則,實現當設備上報數據時將數據轉發至DIS。

  1. 登錄華為雲官方網站,訪問設備接入服務。

  2. 單擊“立即使用”進入設備接入控制台。

  3. 單擊規則 > 創建規則 > 數據轉發”,首次創建對接到DIS服務的規則時,平台會根據對接的雲服務和區域彈出對應的雲服務訪問授權窗口。

4.單擊左側導航欄的“產品”,單擊右上角下拉框,選擇新建產品所屬的資源空間。

注:本文中使用的產品模型和設備僅為示例,您可以使用自己的產品模型和設備進行操作。

5.單擊右上角的“創建產品”,創建一個基於MQTT協議的產品,填寫參數后,單擊“確認”。

基本信息

產品名稱

自定義,如MQTT_Device

協議類型

選擇“MQTT”

數據格式

選擇“JSON”

廠商名稱

自定義

功能定義

選擇模型

請參考步驟6導入模型即可。

所屬行業

根據實際情況進行填寫。

設備類型

6.在功能定義頁面,單擊“上傳模型文件”,單擊Profile.zip,獲取產品模型文件樣例。

7.進入設備 > 設備註冊”頁面,單擊“註冊設備”,參考下錶填寫參數。

參數名稱

說明

所屬產品

選擇在步驟5中創建的產品。

設備標識碼

設備唯一物理標識,如IMEI、MAC地址等,用於設備在接入物聯網平台時攜帶該標識信息完成接入鑒權。

  • 原生MQTT設備:自定義,英文字母和数字的組合字符串。通過註冊成功後生成的“設備ID”(與設備標識碼一一對應)和“設備密鑰”接入平台。

  • NB-IoT設備、集成SDK的設備:NB-IoT設備上的IMEI或MAC地址。設備通過註冊時填寫的“設備標識碼”和“密鑰”接入平台。

設備名稱

自定義。

設備認證類型

選擇“密鑰”。

密鑰

設備密鑰,可自定義,不填寫物聯網平台會自動生成。

填寫完成后單擊“確定”,請注意保存註冊成功返回的“設備ID”和“設備密鑰”。

8.單擊左側導航欄的“規則”,單擊右上角的“創建規則”,選擇“數據轉發”。

9.填寫規則內容,規則名稱自定義,“數據類型”選擇“JSON”,轉發至“數據接入服務(DIS)”,“區域”選擇您開通OBS的區域,“通道”選擇您創建的桶,填寫完成后單擊“創建規則”。

配置數據可視化服務

配置數據可視化服務,新建數據報表視圖。

  1. 登錄華為雲官方網站,訪問數據可視化服務。

  2. 單擊“進入控制台”。

    注:若您未開通DLV服務,可單擊“體驗試用”獲取30天的基礎版免費試用。

  3. 訪問DLV控制台“我的大屏”頁面,新建一個大屏。

4.選擇空白模板,輸入大屏名稱后,單擊“創建大屏”。

5.單擊文本 > 標題”新增一個標題。

 

6.在右側“數據”面板修改靜態數據中“value”的值為“每日銷量”。

7.在大屏內拖動標題到左上角,並拉伸成合適的形狀。

8.單擊常用圖表 > 線狀圖”新增一個線狀圖報表。

9.拖動圖表到標題下面並拉伸成合適的形狀。

10.重複以上步驟再添加一個標題為“時間段銷量”柱狀圖,一個標題為“種類銷量”的餅狀圖,一個標題為“地區銷量”的區域排行圖,並根據自己的需要設置圖表的樣式。最終效果類似下圖。

11.單擊頁面右上角的返回按鈕退出編輯頁面。

 

驗證操作

1.首先控制設備上報10條數據。

  • 您可以使用配置設備接入服務時註冊的真實設備接入平台,上報數據。

  • 您也可以使用模擬器模擬設備上報數據,操作方法請參考通過MQTT.fx體驗設備接入。

    上報數據的樣例如下,請自行修改參數的取值模擬真實設備數據:

樣例1

{
	"msgType": "deviceReq",
	"data": [{
		"serviceId": "sales",
		"serviceData": {
			"category": "soda",
            "number": "1",
            "area": "SZLH",
            "timeStamp": "20190425T091157Z"
		}
	}]
}

上述樣例表示UTC時間2019年4月25日9點11分57秒深圳羅湖的自動販賣機銷售了一支蘇打飲料。

樣例2

{
	"msgType": "deviceReq",
	"data": [{
		"serviceId": "sales",
		"serviceData": {
			"category": "juice",
            "number": "2",
            "area": "SZFT"
            "timeStamp": "20190426T170005Z"
		}
	}]
}

上述樣例表示UTC時間2019年4月26日17點05秒深圳福田的自動販賣機銷售了兩支果汁飲料。

本文以上報下錶的數據為例。

category

number

area

timeStamp

soda

1

SZLH

20190425T091157Z

juice

1

SZFT

20190425T121511Z

sport

1

SZLH

20190425T172433Z

juice

2

SZFT

20190426T170005Z

soda

1

SZNS

20190426T190905Z

juice

1

SZNS

20190427T085959Z

juice

2

SZLH

20190427T111111Z

soda

3

SZFT

20190428T182215Z

sport

1

SZLH

20190429T205901Z

soda

1

SZLG

20190430T225045Z

2.登錄MRS管理控制台,選擇“集群列表 > 現有集群”,單擊集群名進入集群管理頁面。

3.單擊頁面上方的“文件管理”,再單擊“HDFS文件列表”,進入轉儲文件目錄(例如“temp”)查看是否存在轉儲的數據文件。

注:DIS會將數據合併轉發,所以此處的文件數量和上報的數據條數可能會不一致。

4.單擊頁面上方的“作業管理”,在“作業”頁簽中單擊“添加”,配置作業信息。本示例中創建一個spark類型的作業,實現分析設備上報數據,分別按日期、時間段、種類、區域統計銷量,將分析結果輸出為CSV文件並保存至OBS。

參數名稱

說明

作業類型

選擇“SparkSubmit”。

作業名稱

自定義,如“test”。

執行程序路徑

  1. 點擊下載jar包並上傳至OBS桶。

  2. 回到添加作業頁面,單擊“OBS”后單擊“瀏覽”選擇剛剛上傳的jar文件。

運行程序參數

左側選擇“–class”,右側輸入“com.huawei.bigdata.spark.examples.SalesStatistics”。

執行程序參數

輸入“AK SK inputpath outputpath”。

  • 其中AK SK填寫華為雲賬號的AK、SK,獲取方法可參考AK和SK的獲取方法。

  • inputpath填寫文件輸入路徑,在本樣例中為DIS的轉儲路徑,如“/user/temp”。

  • outputpath填寫文件輸出路徑,在本樣例中為已申請的OBS桶內的output文件夾(無需提前在OBS創建文件夾,MRS會自動創建),如“s3a://{OBS桶名稱}/output”。

服務配置參數

無需填寫。

配置完成后單擊“確定”啟動作業。

5.作業完成后,可在OBS桶內看到output文件夾,裏面有四個文件夾,每個文件夾內有一個“_SUCCESS”文件和一個“part”開頭的csv文件。

注:本實驗的樣例程序分析數據時會將UTC時間轉換為本地時間,因此數據分析結果中的日期與時間段數值會和上報時的數值不一致。

6.登錄華為雲官方網站,訪問數據可視化服務。

7.單擊“進入控制台”。

8.單擊“我的數據 > 新建數據連接” ,在“新建數據連接”頁面左側的數據庫類型中,選擇“CSV文件”,按照下錶的數據規劃填寫配置后單擊“確定”。重複本步驟建立4個數據連接。

參數名

說明

名稱

建立4個數據連接,分別命名為:

  • salesByDate

  • salesByTime

  • salesByCategory

  • salesByArea

Access Key

填寫華為雲賬號的AK、SK,獲取方法可參考AK和SK的獲取方法。

Secret Access Key

文件來源

選擇“OBS文件”。

文件路徑

4個連接分別選擇步驟5的output文件夾內和連接同名的文件夾內的csv文件。

9.返回“我的大屏”頁簽,單擊配置數據可視化服務時創建的大屏右下的編輯按鈕進入編輯頁面。

10.選中“每日銷量”表,在右側數據面板選擇數據類型為“CSV文件”,數據連接選擇步驟8添加的數據連接“salesByDate”。

11.根據響應數據的屬性名稱配置字段映射。

配置 “x”為 “saleDate”, “y”為 “saleNumber”。

12.選中“時間段銷量”表,在右側數據面板選擇數據類型為“CSV文件”,數據連接選擇步驟8添加的數據連接“salesByTime”。

13.根據響應數據的屬性名稱配置字段映射。

配置 “x”為 “saleTime”, “y”為 “saleNumber”。

14.選中“種類銷量”表,在右側數據面板選擇數據類型為“CSV文件”,數據連接選擇步驟8添加的數據連接“salesByCategory”。

15.根據響應數據的屬性名稱配置字段映射。

配置 “s”為 “category”, “y”為 “saleNumber”,並設置各個分類的名稱(本示例中為“soda”,“juice”,“sport”)和圖例的顏色。

16.選中“地區銷量”表,在右側數據面板選擇數據類型為“CSV文件”,數據連接選擇步驟8添加的數據連接“salesByArea”。

17.根據響應數據的屬性名稱配置字段映射。

配置 “num”為 “saleNumber”。

18.全部圖表配置完成后,單擊頁面右上角的可預覽報表,示例如下圖。

至此,通過該文檔的學習,您應該對自動售貨機銷售分析場景有了一定的了解。接下來,可以在系列後續文章中,可以學習到更多的物聯網業務場景。

添加華為IoT小助手(微信號:huawei-iot,回復“博客園”)獲取此物聯網免費學習課程。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※教你寫出一流的銷售文案?

RocketMQ系列(五)廣播與延遲消息

今天要給大家介紹RocketMQ中的兩個功能,一個是“廣播”,這個功能是比較基礎的,幾乎所有的mq產品都是支持這個功能的;另外一個是“延遲消費”,這個應該算是RocketMQ的特色功能之一了吧。接下來,我們就分別看一下這兩個功能。

廣播

廣播是把消息發送給訂閱了這個主題的所有消費者。這個定義很清楚,但是這裏邊的知識點你都掌握了嗎?咱們接着說“廣播”的機會,把消費者這端的內容好好和大家說說。

  • 首先,消費者端的概念中,最大的應該是消費者組,一個消費者組中可以有多個消費者,這些消費者必須訂閱同一個Topic。
  • 那麼什麼算是一個消費者呢?我們在寫消費端程序時,看到了setConsumeThreadMax這個方法,設置消費者的線程數,難道一個線程就是一個消費者?錯!這裏的一個消費者是一個進程,你可以理解為ip+端口。如果在同一個應用中,你實例化了兩個消費者,這兩個消費者配置了相同的消費者組名稱,那麼應用程序啟動時會報錯的,這裏不給大家演示了,感興趣的小夥伴私下里試一下吧。
  • 同一個消息,可以被不同的消費者組同時消費。假設,我有兩個消費者組cg-1和cg-2,這兩個消費者組訂閱了同一個Topic,那麼這個Topic的消息會被cg-1和cg-2同時消費。那這是不是廣播呢?錯!當然不是廣播,廣播是同一個消費者組中的多個消費者都消費這個消息。如果配置的不是廣播,像前幾個章節中的那樣,一個消息只能被一個消費者組消費一次。

好了,說了這麼多,我們實驗一下吧,先把消費者配置成廣播,如下:

@Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer broadcast() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast");
    consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    consumer.subscribe("cluster-topic","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        for (MessageExt msg : msgs) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    return consumer;
}

  • 其中,NameServer,訂閱的Topic都沒有變化。
  • 注意其中consumer.setMessageModel(MessageModel.BROADCASTING);這段代碼,設置消費者為廣播。咱們可以看一下,MessageModel枚舉中只有兩個值,BROADCASTINGCLUSTERING,默認為CLUSTERING

因為要測試廣播,所以我們要啟動多個消費者,還記得什麼是消費者嗎?對了,一個ip+端口算是一個消費者,在這裏我們啟動兩個應用,端口分別是8080和8081。發送端的程序不變,如下:

@Test
public void producerTest() throws Exception {

    for (int i = 0;i<5;i++) {
        MessageExt message = new MessageExt();
        message.setTopic("cluster-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes());
        SendResult sendResult = defaultMQProducer.send(message);
        System.out.println("i=" + i);
        System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());
    }
}

我們執行一下發送端的程序,日誌如下:

i=0
BrokerName:broker-a
i=1
BrokerName:broker-a
i=2
BrokerName:broker-b
i=3
BrokerName:broker-b
i=4
BrokerName:broker-b

再來看看8080端口的應用後台打印出來的日誌:

消費了5個消息,再看看8081的後台打印的日誌,

也消費了5個。兩個消費者同時消費了消息,這就是廣播。有的小夥伴可能會有疑問了,如果不設置廣播,會怎麼樣呢?私下里實驗一下吧,上面的程序中,只要把設置廣播的那段代碼註釋掉就可以了。運行的結果當然是只有一個消費者可以消費消息。

延遲消息

延遲消息是指消費者過了一個指定的時間后,才去消費這個消息。大家想象一個電商中場景,一個訂單超過30分鐘未支付,將自動取消。這個功能怎麼實現呢?一般情況下,都是寫一個定時任務,一分鐘掃描一下超過30分鐘未支付的訂單,如果有則被取消。這種方式由於每分鐘查詢一下訂單,一是時間不精確,二是查庫效率比較低。這個場景使用RocketMQ的延遲消息最合適不過了,我們看看怎麼發送延遲消息吧,發送端代碼如下:

@Test
public void producerTest() throws Exception {

    for (int i = 0;i<1;i++) {
        MessageExt message = new MessageExt();
        message.setTopic("cluster-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes());
        message.setDelayTimeLevel(2);
        SendResult sendResult = defaultMQProducer.send(message);
        System.out.println("i=" + i);
        System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());
    }
}
  • 我們只是增加了一句message.setDelayTimeLevel(2);
  • 為了方便,這次我們只發送一個消息。

setDelayTimeLevel是什麼意思,設置的是2,難道是2s后消費嗎?怎麼參數也沒有時間單位呢?如果我要自定義延遲時間怎麼辦?我相信很多小夥伴都有這樣的疑問,我也是帶着這樣的疑問查了很多資料,最後在RocketMQ的Github官網上看到了說明,

  • 在RocketMQ的源碼中,有一個MessageStoreConfig類,這個類中定義了延遲的時間,我們看一下,
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  • 我們在程序中設置的是2,那麼這個消息將在5s以後被消費。
  • 目前RocketMQ還不支持自定義延遲時間,延遲時間只能從上面的時間中選。如果你非要定義一個時間怎麼辦呢?RocketMQ是開源的,下載代碼,把上面的時間改一下,再打包部署,就OK了。

再看看消費端的代碼,

@Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer broadcast() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast");
    consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    consumer.subscribe("cluster-topic","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        for (MessageExt msg : msgs) {
            Date now = new Date();
            System.out.println("消費時間:"+now);

            Date msgTime = new Date();
            msgTime.setTime(msg.getBornTimestamp());
            System.out.println("消息生成時間:"+msgTime);

            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    return consumer;
}
  • 我們還是使用廣播的模式,沒有變。
  • 打印出了當前的時間,這個時間就是消費的時間。
  • 通過msg.getBornTimestamp()方法,獲得了消息的生成時間,也打印出來,看看是不是延遲5s。

啟動兩個消費者8080和8081,發送消息,再看看消費者的後台日誌,

消費時間:Thu Jun 11 14:45:53 CST 2020
消息生成時間:Thu Jun 11 14:45:48 CST 2020
this is simpleMQ,my NO is 0---Thu Jun 11 14:45:47 CST 2020

我們看到消費時間比生成時間晚5s,符合我們的預期。這個功能還是比較實用的,如果能夠自定義延遲時間就更好了。

總結

RocketMQ的這兩個知識點還是比較簡單的,大家要分清楚什麼是消費者組,什麼是消費者,什麼是消費者線程。另外就是延遲消息是不支持自定義的,大家可以在Github上看一下源碼。好了~今天就到這裏了。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

網絡KPI異常檢測之時序分解算法

時間序列數據伴隨着我們的生活和工作。從牙牙學語時的“1, 2, 3, 4, 5, ……”到房價的走勢變化,從金融領域的刷卡記錄到運維領域的核心網性能指標。時間序列中的規律能加深我們對事物和場景的認識,時間序列中的異常能提醒我們某些部分可能出現問題。那麼如何去發現時間序列中的規律、找出其中的異常點呢?接下來,我們將揭開這些問題的面紗。

什麼是異常

直觀上講,異常就是現實與心理預期產生較大差距的特殊情形。如2020年春節的新型肺炎(COVID-19,coronavirus disease 2019),可以看到2月12日有一個明顯的確診病例的升高,這就是一個異常點,如下圖:

從統計上講,嚴重偏離預期的點,常見的可以通過3-sigma準則來判定。

從數學上講,它就是一個分段函數:

那麼我們有哪些方法來發現異常呢?異常分析的方法有很多,在本文中,我們主要講解時間序列分解的算法。接下來,我們先從時間序列的定義開始講起。

什麼是時間序列

前面章節,我們列舉了生活和工作中的一些時間序列的例子,但是並沒有給出定義。在本節中,我們將首先給出時間序列的定義,然後給出時間序列的分類方法,最後再給大家展示常見的時間序列。

1.時間序列的定義

時間序列是不同時間點的一系列變量所組成的有序序列。例如北京市2013年4月每日的平均氣溫就構成了一個時間序列,為了方便,我們一般認為序列中相鄰元素具有相同的時間間隔。

時間序列可以分為確定的和隨機的。例如,一個1990年出生的人,從1990年到1999年年齡可以表述為{0,1,2,…,9},這個序列並沒有任何隨機因素。這是一個確定性的時間序列。現實生活中我們所面對的序列更多的是摻雜了隨機因素的時間序列,例如氣溫、銷售量等等,這些是帶有隨機性的例子。我們說的時間序列一般是指帶有隨機性的。

那麼對於隨機性的時間序列,又如何進行分類呢?

2.時間序列的分類

從研究對象上分,時間序列分為一元時間序列和多元時間序列,如新冠肺炎例子中,只看確診病例的變化,它是一元時間序列。如果把確診病例和疑似病例聯合起來看,它是一個多元時間序列。

從時間參數上分,時間序列分為離散時間的時間序列和連續時間的時間序列。例如氣溫變化曲線,通常是按照天、小時進行預測、計算的,這個採集的時間是離散的,因此,它是一個離散時間的時間序列。再如花粉在水中呈現不規則的運動,它無時無刻不在運動,它是一個連續時間的時間序列,這就是大家眾所周知的布朗運動。在我們的工作中,我們一般遇到的都是離散時間的時間序列。

從統計特徵上分,時間序列分為平穩時間序列和非平穩時間序列。平穩序列從直觀上講,均值和標準差不隨着時間發生變化,而非平穩序列均值或者標準差一般會隨着時間發生變化。下面兩個圖分別給出平穩序列和非平穩序列的例子。

3.常見的時間序列

在本節,我們將給大家列舉一些常見的時間序列,讓大家對常見的時間序列有一個直觀的概念。

時間序列的分解

前面給大家講了異常和時間序列的概念,本章將給大家講解時間序列分解技術。

1.目的

時間序列分解是探索時序變化規律的一種方法,主要探索周期性和趨勢性。基於時序分解的結果,我們可以進行後續的時間預測和異常檢測。

2.主要組成部分

在時間序列分析中,我們經常要關注趨勢和周期。因此,一般地,我們將時序分成三個部分:趨勢部分、周期部分和殘差部分。結合下圖CO2含量的例子(見下圖)對這三個主要部分進行解釋:

1)趨勢部分:展示了CO2含量逐年增加;

2)周期部分:反應了一年中CO2含量是周期波動的;

3)殘差部分:趨勢和周期部分不能解釋的部分。

3.時序分解模型

時間序列分解基於分解模型的假設。通常,我們會考慮以下兩種模型:

加法模型適用於以下場景:

  1. 當周期性不隨着趨勢發生變化時,首選加法模型,如下圖(a);
  2. 當目標存在負值時,應選擇加法模型;

乘法模型適用於以下場景:

  1. 周期隨着隨時發生變化時,首選乘法模型,如下圖(b);
  2. 經濟數據,首選乘法模型(增長率、可解釋)。

另外,當我們不清楚選擇哪個模型時,可以兩個模型都使用,選擇誤差最小的那一個。

由於乘法模型與加法模型可以相互轉化,我們後面僅以加法模型來進行介紹。

4.時序分解算法

基於周期、趨勢分解的時序分解算法主要有經典時序分解算法、Holt-Winters算法和STL算法。經典時序分解算法起源於20世紀20年代,方法較簡單。Holt-Winters算法於1960年由Holt的學生 Peter Winters 提出,能夠適應隨着時間變化的季節項。STL(Seasonal and Trend decomposition using Loess)分解法,由Cleveland 等於1990年提出,比較通用,且較為穩健。三者之間的關係,如下圖所示:

 

4.1經典時序分解算法

經典時序分解算法是最簡單的一種分解算法,它是很多其他分解算法的基礎。該算法基於“季節部分不隨着時間發生變化”這一假設,且需要知道序列的周期。另外,該算法基於滑動平均技術。

其中,m=2k+1. 也就是說,時刻t的趨勢項的估計值可以通過前後k個時刻內的平均值得到。階數 m 越大,趨勢越光滑。由上面的公式可以看出,m一般取奇數,這保證了對稱性。但是在很多場景下,周期是偶數,例如一年有4個季度,則周期為4,是偶數。此時,需要做先做一個4階滑動平均(4-MA),再對所得結果做一個2階滑動平均(2-MA),整個過程記為 。這樣處理后的結果是對稱的,即加權的滑動平均,數學表達如下:

下面我們將講解經典時序分解算法的計算步驟。

經典時序分解算法雖然簡單、應用廣泛,但是也存在一些問題:

1) 無法估計序列最前面幾個和最後面幾個的趨勢和周期部分,例如若m=4,則無法估計前2個和后2個觀測的趨勢和周期的部分;

2) 嚴重依賴“季節性部分每個周期都是相同的”這一假設;

3) 過度光滑趨勢部分。

4.2Holt-Winters算法

在上一節中,我們介紹了經典時序分解算法,但是它嚴重依賴“季節性部分每個周期都是相同的”這一假設。為了能夠適應季節部分隨時間發生變化,Holt-Winters算法被提出。Holt-Winters算法是基於簡單指數光滑技術。首先,我們先介紹簡單指數光滑技術。

簡單指數光滑的思想主要是以下兩點:

  1. 對未來的預測:用當前的水平對下一時刻的點進行預測;
  2. 當前水平的估計:使用當前時刻的觀測值和預測值(基於歷史觀測數據的預測值,即上一時刻的水平)的加權平均作為當前水平的估計。

簡單指數光滑的模型比較簡單,如下:

Holt-Winters算法是簡單指數光滑在趨勢(可理解為水平的變化率)和季節性上的推廣,主要包括水平(前文中的趨勢項)、趨勢項和季節項三個部分。

4.3 STL算法

STL(Seasonal and Trend decomposition using Loess)是一個非常通用的、穩健性強的時序分解方法,其中Loess是一種估算非線性關係的方法。STL分解法由 Cleveland et al. (1990) 提出。

STL算法中最主要的是局部光滑技術 (locally weighted scatterplot smoothing, LOWESS or LOESS),有時也稱為局部多項式回歸擬合。它是對兩維散點圖進行平滑的常用方法,它結合了傳統線性回歸的簡潔性和非線性回歸的靈活性。當要估計某個響應變量值時,先從其預測點附近取一個數據子集(如下圖實點 是要預測的點,選取周圍的需點來進行擬合),然後對該子集進行線性回歸或二次回歸,回歸時採用加權最小二乘法(如下圖,採用的是高斯核進行加權),即越靠近估計點的值其權重越大,最後利用得到的局部回歸模型來估計響應變量的值。用這種方法進行逐點運算得到整條擬合曲線。

STL算法的主要環節包含內循環、外循環和季節項后平滑三個部分:

  • 內循環:
  • 外循環:

外循環主要作用則是引入了一個穩健性權重項,以控制數據中異常值產生的影響,這一項將會考慮到下一階段內循環的臨近權重中去。

  • 季節項后平滑:

趨勢分量和季節分量都是在內循環中得到的。循環完后,季節項將出現一定程度的毛刺現象,因為在內循環中平滑時是在每一個截口中進行的,因此,在按照時間序列重排后,就無法保證相鄰時段的平滑了,為此,還需要進行季節項的后平滑,后平滑基於局部二次擬合,並且不再需要在loess中進行穩健性迭代。

異常判斷的準則

對於異常的判斷,我們常用的有 n-sigma 準則和boxplot準則(箱線圖準則)。那這些準備是如何計算的,有哪些區別和聯繫呢?

1.n-sigma 準則

n-sigma準則有計算簡單、效率高且有很強的理論支撐,但是需要近似正態的假設,且均值和標準差的計算用到了全部的數據,因此,受異常點的影響較大。

2.boxplot 準則

為了降低異常點的影響,boxplot準則被提出。boxplot(箱線圖)是一種用作显示一組數據分散情況的統計圖,經常用於異常檢測。BoxPlot的核心在於計算一組數據的中位數、兩個四分位數、上限和下限,基於這些統計值畫出箱線圖。

根據上面的統計值就可以畫出下面的圖,超過上限的點或這個低於下限的點都可以認為是異常點。

從上面的計算上可以看出,boxplot對異常點是穩健的。

基於時序分解的異常檢測算法

在前面的章節,我們了解了時序分解的算法,也學習了異常判斷的準則,那麼如何基於時序分解進行異常檢測呢?在本章,我們將首先給出異常檢測算法的原理,再給出基於時序分解的異常檢測算法步驟。

1.異常檢測算法原理

回顧一下異常的定義,它是一個分段函數:

我們可以看到預測值(擬合值)和閾值是不知道的。對於預測值,我們可以通過找規律來猜這個預測值是多少,本章我們可以通過時序分解找周期和趨勢的規律,進而得到預測值。對於閾值,我們可以看到閾值是針對真實值和預測值的差值設置的,目的是把異常值找到,因此我們只要找到正常值的殘差和異常值的殘差的邊界即可。而我們n-sigma準則和boxplot準則就可以根據殘差把邊界找出來,即閾值。這個思考和實現的過程示意圖如下:

2.基於時序分解的異常檢測算法

Demo代碼下載地址 ,本文主要是想記錄基於時間序列的異常檢測方法,希望能夠幫到你。

 

點擊關注,第一時間了解華為雲新鮮技術~

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

USB CONNECTOR掌控什麼技術要點? 帶您認識其相關發展及效能

台北網頁設計公司這麼多該如何選擇?

※智慧手機時代的來臨,RWD網頁設計為架站首選

※評比南投搬家公司費用收費行情懶人包大公開

※回頭車貨運收費標準

曹工說JDK源碼(4)–抄了一小段ConcurrentHashMap的代碼,我解決了部分場景下的Redis緩存雪崩問題

曹工說JDK源碼(1)–ConcurrentHashMap,擴容前大家同在一個哈希桶,為啥擴容后,你去新數組的高位,我只能去低位?

曹工說JDK源碼(2)–ConcurrentHashMap的多線程擴容,說白了,就是分段取任務

曹工說JDK源碼(3)–ConcurrentHashMap,Hash算法優化、位運算揭秘

什麼是緩存雪崩

基本概念梳理

這個基本也是redis 面試的經典題目了,然而,網上不少博客對這個詞的定義都含糊不清,各執一詞。

主要有兩類說法:

  • 大量緩存key,由於設置了相同的過期時間,在某個時刻同時失效,導致此刻的查詢請求,全部湧向db,本來db的tps大概是幾千左右,結果湧入了幾十萬的請求,那db肯定直接就扛不住了

    這種說法下面,解決方案一般是,把過期時間增加一個隨機值,這樣,也就不會大批量的key同時失效了

  • 另外一種說法是,本來redis扛下了大部分的請求,但是,由於緩存所在的機器,發生了宕機。此時,緩存這台機器之間就連不上了,redis服務也掛了,此時,你的服務里,發現redis取不到,然後全都跑去查數據庫,那,就發生和前面一樣的情況了,請求全部湧向db,db無響應。

兩類說法,也不用覺得,這個對,那個不對,不過是一個技術名詞,當初發明這個詞的人,估計也沒想那麼多,結果傳播開來之後,就變成了現在這個樣子。

我們這裏主要採用下面那一種說法,因為下面這種說法,其實是已經包含了上面的情景。但是,下面這種場景,要複雜的多,因為redis此時就是一個完全不可信的東西了,你得想好,怎麼不讓它掛掉,那是不是應該部署sentinel、cluster集群?同時,持久化必須要開啟。

這樣呢,掛掉后,短暫的不可用之後,大概幾十s吧,緩存集群就恢復了,就又可用了。

同時,我們還得考慮,假設,現在redis掛了,我們代碼的降級策略是什麼?

大家發現redis掛了,首先,估計是會拋異常了,連接超時;拋了異常后,要直接拋到前端嗎?作為一個穩健的後端程序,那肯定是不行的,你redis掛了,數據庫又沒掛;好吧,那我們就大家一起去查數據庫。

結果,大量的查詢請求,就烏泱泱地跑去查庫了,然後,db卒。這個肯定不行。

所以,我們必須要控制的一點是,當發現某個key失效了,不是大家都去查庫,而是要進行 併發控制

什麼是併發控制?就是不能全部放過去查庫,只能放少部分,免得把脆弱的db打死。

併發控制,基本就是要爭奪去查庫的權利了,這一步,基本就是一個選舉的過程,可以通過搶鎖的方式,比如Reentrentlock,synchronized,cas也可以。

  1. 搶到鎖的線程,有資格去查庫,其他線程要麼被阻塞,要麼自旋

  2. 搶到鎖的線程,去查庫,查到數據后,將數據存放在某個地方,通知其他線程去取(如果其他線程被阻塞的話);或者,如果其他線程沒被阻塞,比如sleep 50ms,再去指定的地方拿數據那種,這種就不需要通知

    總之,如果其他線程要我們通知,我們就通知;不要我們通知,我們就不通知。

搶到鎖的線程,在構建緩存時,其他線程應該干什麼?

  1. 在while(true)里,sleep 50ms,然後再去取數據

    這種類似於忙等待,但是每次sleep一會,所以還不錯

  2. 將自己阻塞,等待搶到鎖的線程,構建完緩存后,來喚醒

  3. 在while(true)里,一直忙循環,期間一直檢查數據是否已經ok了,這種方案呢,要看裏面:檢查數據的操作,是否耗時;如果只是檢查jvm內存里的數據,那還好;否則的話,假設要去檢查redis的話,這種io比較耗時的操作的話,就不合適了,cpu會一直空轉。

本文採用的方案

主線程構建緩存時,其他線程,在while(true)里,sleep 一定時間,然後再檢查數據是否ready。

說了這麼多,好像和題目里的concurrenthashmap沒啥關係,不,是有關係的,因為,這個思路,其實就是來自於concurrentHashMap。

ConcurrentHashMap中,是怎麼去初始化底層數組的

在我們用無參構造函數,去new一個ConcurrentHashMap時,此時還不會去創建底層數組,這個是一個小優化。什麼時候創建數組呢,是在我們第一次去put的時候。

put的時候,會調用putVal。

其中,putVal代碼如下:

    transient volatile Node<K,V>[] table;

	final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
      	// 1
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
          	// 2
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
  • 1處,把field table,賦值給局部變量tab

  • 2處,如果tab為null,則進行initTable初始化

    這個2處,在多線程put的時候,是可能多個線程同時進來的。有併發問題。

我們接下來,看看initTable是怎麼解決這個問題的,畢竟,我們new數組,只new一次即可,new那麼多次,沒用,對性能有損耗。所以,這裏面肯定會多線程爭奪初始化權利的代碼。

	private transient volatile int sizeCtl;
	transient volatile Node<K,V>[] table;

	/**
     * Initializes table, using the size recorded in sizeCtl.
     */
    private final Node<K,V>[] initTable() {
        Node<K,V>[] tab;
      	int sc;
      	
      	// 0
        while ((tab = table) == null || tab.length == 0) {
          	// 1
            if ((sc = sizeCtl) < 0)
                Thread.yield(); // lost initialization race; just spin
          	// 2
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                  	// 3
                    if ((tab = table) == null || tab.length == 0) {
                      	// 4
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
                        table = tab = nt;
                        sc = n - (n >>> 2);
                    }
                } finally {
                  	// 5
                    sizeCtl = sc;
                }
                break;
              
            }// end if
          
        }// end while
        return tab;
    }
  • 1處,這裏把sizeCtl,賦值給局部變量sc。這裏的sizeCtl是一個很重要的field,當我們new完之後,默認這個字段,要麼為0,要麼為準備創建的底層數組的長度。

    這裏去判斷是否小於0,那肯定不滿足,小於0,會是什麼意思?當某個線程,搶到了這個initTable中的底層數組的創建權利時,就會把sizeCtl改為 -1。

    所以,這裏的意思是,看看是否已經有其他線程在初始化了,如果已經有了,則直接調用:

    Thread.yield();

    這個方法的意思是,暗示操作系統,自己準備放棄cpu;但操作系統,自有它自己的線程調度規則,所以,這個方法可能沒什麼效果;我們業務代碼,這裏一般可以修改為Thread.sleep。

    這個方法調用完成后,後續也沒有其他代碼,所以會直接跳轉到循環開始處(0處代碼),判斷table是否初始化ok了,如果沒有ok,則會繼續進來。

  • 2處,使用cas,如果此時,sizeCtl的值等於sc的值,就修改sizeCtl為 -1;如果成功,則返回true,進入3處

    否則,會跳轉到0處,繼續循環。

  • 3處,雖然搶到了控制權,但是這裏還是要再判斷一下,不然可能出現重複初始化,即,不加這一行,4處的代碼,會被重複執行

  • 4處開始,這裏去執行真正的初始化邏輯。

    // 
    int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
    @SuppressWarnings("unchecked")
    // 1
    Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
    // 2
    table = tab = nt;
    sc = n - (n >>> 2);
    

    這裏的1處,new數組;2處,賦值給field:table;此時,因為table 這個field是volatile修飾的,所以其他線程會馬上感知到。0處代碼就不會為true了,就不會繼續循環了。

  • 5處,修改sizeCtl為正數。

這裏說下,為啥要加3處的那個判斷。

現在,假設線程A,在初始化完成后,走到了5處,修改了sizeCtl為正數;而線程B,剛好執行1處代碼:

// 1
if ((sc = sizeCtl) < 0)

那肯定,1處就不滿足了;然後就會進到2處,cas修改成功,進行初始化。沒有3處判斷的話,就會重複初始化。

基於concurrentHashmap,實現我們的緩存雪崩方案

我這裏的方案,還是比較簡單那種,就是,n個線程同時爭奪構建緩存的權利;winner線程,構建緩存后,會把緩存設置到redis;其他線程則是一直在while(true)里sleep一段時間,然後檢查redis里的數據是否不為空。

這個方案中,redis掛了這種情況,是沒在考慮中的,但是一個方案,沒辦法立馬各方面全部到位,後續我再完善一下。

不考慮緩存雪崩的代碼

@Override
public Users getUser(long userId) {
    ValueOperations<String, Users> ops = redisTemplate.opsForValue();
  	// 1
    Users s = ops.get(String.valueOf(userId));
    if (s == null) {
        /**
         * 2 這裏要去查庫獲取值
         */
        Users users = getUsersFromDB(userId);
		// 3
        ops.set(String.valueOf(users.getUserId()),users);

        return users;
    }

    return s;
}

private Users getUsersFromDB(long userId) {
    Users users = new Users();

    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    System.out.println("spent 1s to get user from db");
    users.setUserId(userId);
    users.setUserName("zhangsan");

    return users;
}

直接看上面的1,2,3處。就是檢查、構建緩存,設置到緩存的過程。

考慮緩存雪崩的代碼

	// 1
	private volatile int initControl;

	@Override
    public Users getUser(long userId) {
        ValueOperations<String, Users> ops = redisTemplate.opsForValue();

        Users users;
        while (true) {
          	// 2
            users = ops.get(String.valueOf(userId));
            if (users != null) {
              	// 3 
                break;
            }
			
          	// 4
            int initControlLocal = initControl;
            /**
             * 5 如果已經有線程在進行獲取了,則直接放棄cpu
             */
            if (initControlLocal < 0) {
//                log.info("initControlLocal < 0,just yield and wait");
//                Thread.yield();
                try {
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    log.warn("e:{}", e);
                }
                continue;
            }


            /**
             * 6 爭奪控制權
             */
            boolean bGotChanceToInit = U.compareAndSwapInt(this,
                    INIT_CONTROL, initControlLocal, -1);
          	// 7
            if (bGotChanceToInit) {
                try {
                  	// 8
                    users = ops.get(String.valueOf(userId));
                    if (users == null) {
                        log.info("got change to init");
                        /**
                         * 9 這裏要去查庫獲取值
                         */
                        users = getUsersFromDB(userId);
                        ops.set(String.valueOf(users.getUserId()), users);
                        log.info("init over");
                    }
                } finally {
                  	// 10
                    initControl = 0;
                }

                break;
            }// end if (bGotChanceToInit)
        }// end while


        return users;
    }
  • 1處,定義了一個field,initControl;默認為0.線程們會去使用cas,修改為-1,成功的線程,即獲得初始化緩存的權利。

    注意,要定義為volatile,保證線程間的可見性

  • 2處,去redis獲取緩存,如果不為null,直接返回

  • 4處,如果沒取到緩存,則進入此處;此處,將field:initControl賦值給局部變量

  • 5處,判斷局部變量initControlLocal,是否小於0;小於0,說明已經有線程在進行初始化了,直接contine,繼續下一次循環

  • 6處,如果當前還沒有線程在初始化,則開始競爭初始化的權利,誰成功地用cas,修改field:initControl為-1,誰就獲得這個權利

  • 7處,如果當前線程獲得了權利,則進入8處,否則,會繼續下一次循環

  • 8處,再次去redis,獲取緩存,如果不為空,則進入9處

  • 9處,查庫,設置緩存

  • 10處,修改field:initControl為0,表示退出初始化

這裏的代碼,整體和hashmap中的initTable是一模一樣的。

如何測試

上面的方案,怎麼測試沒問題呢?我寫了一段測試代碼。

    ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 100,
            60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1000), new RejectedExecutionHandler() 	{
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            log.info("discard:{}",r);
        }
    });
	
	@RequestMapping("/test.do")
    public void test() {
      	// 0
        iUsersService.deleteUser(111L);

        CyclicBarrier barrier = new CyclicBarrier(100);

        for (int i = 0; i < 100; i++) {

            executor.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        barrier.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                    long start = System.currentTimeMillis();
                  	// 1
                    Users users = iUsersService.getUser(111L);
                    log.info("result:{},spent {} ms", users, System.currentTimeMillis() - start);
                }
            });
        }

    }

上面模擬100併發下,獲取緩存。

0處,把緩存刪了,模擬緩存失效

1處,調用方法,獲取緩存。

效果如下:

可以看到,只有一個線程拿到了初始化權利。

源碼位置

https://gitee.com/ckl111/all-simple-demo-in-work-1/tree/master/redis-cache-avalanche

總結

jdk的併發包,寫得真是有水平,大家仔細研究的話,必有收穫。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

Spring系列.依賴注入配置

依賴注入的配置

Spring的依賴注入分為基於構造函數的依賴注入基於setter方法的依賴注入

基於構造函數的依賴注入

        <!-- 通過構造器參數索引方式依賴注入 -->
	<bean id="byIndex" class="cn.javass.spring.chapter3.HelloImpl3">
		<constructor-arg index="0" value="Hello World!"/>
		<constructor-arg index="1" value="1"/>
	</bean>
	<!-- 通過構造器參數類型方式依賴注入 -->
	<bean id="byType" class="cn.javass.spring.chapter3.HelloImpl3">
		<constructor-arg type="java.lang.String" value="Hello World!"/>
		<constructor-arg type="int" value="2"/>
	</bean>
	<!-- 通過構造器參數名稱方式依賴注入 -->
	<bean id="byName" class="cn.javass.spring.chapter3.HelloImpl3">
		<constructor-arg name="message" value="Hello World!"/>
		<constructor-arg name="index" value="3"/>
	</bean>
	<!-- 通過靜態的工廠方法注入 -->
	<bean id="byName" class="cn.javass.spring.chapter3.HelloImpl3" factory-method="getBean">
		<constructor-arg name="message" value="Hello World!"/>
		<constructor-arg name="index" value="3"/>
	</bean>

基於setter方法的依賴注入

    <bean class="...HelloImpl4">
	    <property name="message" value="Hello"/>
	    <property name="index" value="1"/>  //value中的值全部是字符串形式,如果轉換出錯會報異常
	</bean>
	<bean id="Hello2" class="com.csx.personal.web.services.HelloImpl2">
		<property name="msg" ref="message"/>  //msg屬性是一個類對象
	</bean>

循環依賴:創建Bean A需要Bean B,創建Bean B需要Bean C,創建Bean C需要Bean A 這樣就形成了循環依賴。 Spring的解決方案:Spring創建Bean的時候會維護一個池,在創建A的時候會去池中查找A是否在池子中,假如發現就拋出循環依賴異常。

避免依賴注入時的循環依賴:可以使用setter方式注入,不要使用構造器形式的注入。

依賴配置常見列子

常量值注入配置

    <bean id="myDataSource" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <!-- results in a setDriverClassName(String) call -->
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost:3306/mydb"/>
        <property name="username" value="root"/>
        <property name="password" value="masterkaoli"/>
    </bean>

注入其他Bean

這邊分別給出了一個引用當前容器和引用父容器中Bean的列子。

   <bean id="Hello2" class="com.csx.personal.web.services.HelloImpl2">
	<property name="msg">        //msg屬性是一個類對象
            <ref  bean="message"/>   //引用同一個容器中id="message"的Bean
       </property>
   </bean>
    
    <!-- 引用父容器中的Bean -->
    <!-- in the parent context -->
    <bean id="accountService" class="com.something.SimpleAccountService">
    <!-- insert dependencies as required as here -->
    </bean>
 
    <!-- in the child (descendant) context -->
    <bean id="accountService" <!-- bean name is the same as the parent bean -->
    class="org.springframework.aop.framework.ProxyFactoryBean">
    <property name="target">
        <ref parent="accountService"/> <!-- notice how we refer to the parent bean -->
    </property>
    <!-- insert other configuration and dependencies as required here -->
    </bean>

注入內部Bean

內部bean:這種bean一般只讓某個外部bean使用(和內部類相似),不讓容器中的其他Bean使用。

    <bean id="outer" class="...">
        <property name="target">
             <!-- this is the inner bean -->
            <bean class="com.example.Person"> 
                <property name="name" value="Fiona Apple"/>
                <property name="age" value="25"/>
            </bean>
        </property>
   </bean>

集合的注入

集合類的注入建議使用util命名空間

    <util:map id="myMap" key-type="java.lang.String" value-type="java.lang.String">
        <entry key="key1" value="chen"/>
        <entry key="key2" value="zhao"/>
    </util:map>

    <util:list id="myList" value-type="java.lang.String">
        <value>chen</value>
        <value>zhao</value>
    </util:list>
    
    <util:set id="mySet" value-type="java.lang.String" scope="singleton">
        <value>chen</value>
        <value>zhao</value>
    </util:set>
    
    <util:properties id="myProp" location="classpath:xx.properties"/>

null值和空字符串的注入

   <bean class="...HelloImpl4">
        <property name="message"><null/></property> //null值
        <property name="index" value=""/>  //空字符串
  </bean>

使用depends-on屬性

depends-on屬性用來指定bean的初始化順序。這個屬性只對scope是單列的bean生效

    <!--在實例化beanOne之前先實例化manager和accountDao這兩個bean-->
    <bean id="beanOne" class="ExampleBean" depends-on="manager,accountDao">
	<property name="manager" ref="manager" />
    </bean>
    <bean id="manager" class="ManagerBean" />
    <bean id="accountDao" class="x.y.jdbc.JdbcAccountDao" />

懶加載

bean的定義中有一個lazy-init這個屬性,用來設置單列bean在容器初始化后是否實例化這個bean。默認情況下容器是會實例化所有單例bean的,我們也建議這麼做,因為這樣能在容器初始化階段就發現bean配置是否正確。如果一個Bean按照下面的設置,lazy-init被設置為true那麼它不會被容器預初始化,只有在被使用的時候才被初始化。但是如果有一個單列類依賴了這個Bean,那麼這個被設置成懶加載的Bean還是會被預初始化。

   <bean id="lazy" class="com.something.ExpensiveToCreateBean" lazy-init="true"/>

如果想設置全局的單例Bean都不要預先初始化,那麼可以在xml中做如下設置:

    <beans default-lazy-init="true">
        <!-- no beans will be pre-instantiated... -->
    </beans>

Autowiring相關

當我們要往一個bean的某個屬性里注入另外一個bean,我們會使用property +ref標籤的形式。但是對於大型項目,假設有一個bean A被多個bean引用注入,如果A的id因為某種原因修改了,那麼所有引用了A的bean的ref標籤內容都得修改,這時候如果使用autowire=”byType”,那麼引用了A的bean就完全不用修改了。

 <!--autowire的用法如下,對某個Bean配置autowire模式 -->
 <!--和給Bean的屬性添加@AutoWired註解的效果一致-->
 <bean id="auto" class="example.autoBean" autowire="byType"/>

autowire的幾種模式:

  • no模式:也是默認模式,這種模式下不會進行自動屬性注入,需要我們自己通過value或者ref屬性進行配置;
  • byName模式:通過屬性的名稱自動裝配,Spring會在容器中查找名稱與bean屬性名稱一致的bean,並自動注入到bean屬性中。當然bean的屬性需要有setter方法。例如:bean A有個屬性master,master的setter方法就是setMaster,A設置了autowire=”byName”,那麼Spring就會在容器中查找名為master的bean通過setMaster方法注入到A中;
  • byType:通過類型自動裝配(注入)。Spring會在容器中查找類(Class)與bean屬性類一致的bean,並自動注入到bean屬性中,如果容器中包含多個這個類型的bean,Spring將拋出異常。如果沒有找到這個類型的bean,那麼注入動作將不會執行;
  • constructor:類似於byType,但是是通過構造函數的參數類型來匹配。假設bean A有構造函數A(B b, C c),那麼Spring會在容器中查找類型為B和C的bean通過構造函數A(B b, C c)注入到A中。與byType一樣,如果存在多個bean類型為B或者C,則會拋出異常。但時與byType不同的是,如果在容器中找不到匹配的類的bean,將拋出異常,因為Spring無法調用構造函數實例化這個bean;
  • default : 採用父級標籤(即beans標籤的default-autowire屬性)的配置。

需要注意的是上面這5中方式注入都需要我們提供相應的setter方法,通過@Autowired的方式不需要提供相應的setter方法。

自動裝配的缺點

  • 屬性和構造函數參數設置中的顯式依賴項會覆蓋自動裝配;

將Bean排除自動裝配

如果按照下面的方式配置了Bean,那麼這個Bean將不會作為自動裝配的候選Bean。但是autowire-candidate自會對byType形式的自動注入生效,如果我們是通過byName的形式進行自動注入,那麼還是能注入這個Bean。

    <bean id="auto" class="example.autoBean" autowire="byType" autowire-candidate="false"/>

如果我們只想讓某些Bean作為自動裝配的候選Bean,那麼可以進行全局設置。如果做了下面的配置,那麼只有id為bean1和bean2的Bean才會成為自動裝配的候選Bean。同時default-autowire-candidates的值支持正則表達式形式。但是強烈建議不要配置這個選項的值,使用默認的配置就行。

    <beans default-autowire-candidates="bean1,bean2">
    </beans>

方法注入(單例依賴原型Bean)

我們在配置bean的時候首先要考慮這個bean是要配置成單例模式還是其他模式。 在我們的應用中,絕大多數類都是單例類。當單例類引用單例類,或者原型類引用原型類、單例類時,我們只要像普通的配置方式就行了。但是當一個單例類引用原型類時,就會出現問題。這種情況可以使用下面的方式進行注入。

   @Service
   @Scope("prototype")
   public class MyService1 {
public void service() {
   System.out.println(this.toString() + ":id");
}
	}
	
	@Service
public abstract class MyService {

    private MyService1 service1;
    public void useService(){
     service1 = createService1();
     service1.service();
    }

    @Lookup("myService1")
    public abstract MyService1 createService1();

}
	
//也可以這樣配置bean
<bean id="serviceC" class="com.csx.demo.springdemo.service.ServiceC">
    <lookup-method bean="serviceD" name="createService"/>
</bean>
<bean id="serviceD" class="com.csx.demo.springdemo.service.ServiceD"     scope="prototype"/>

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化