WPF 修改屏幕DPI,會觸發控件重新加載Unload/Load

修改屏幕DPI,會觸發控件的Unloaded/Loaded

現象/重現案例

這裏簡單介紹下,修改屏幕DPI,觸發Unloaded/Loaded的神奇案例

1. 我們新建一個窗口,添加一個UserControl1,然後在UserControl1中添加UserControl2

 1 <Window x:Class="WPFUnloadedTriggerTest.MainWindow"
 2         xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
 3         xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
 4         xmlns:d="http://schemas.microsoft.com/expression/blend/2008"
 5         xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006"
 6         xmlns:local="clr-namespace:WPFUnloadedTriggerTest"
 7         mc:Ignorable="d"
 8         Title="MainWindow" Height="450" Width="800">
 9     <local:UserControl1></local:UserControl1>
10 </Window>
11 ------------------------------我是分隔線-----------------------------------
12 <UserControl x:Class="WPFUnloadedTriggerTest.UserControl1"
13              xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
14              xmlns:x="http://schemas.microsoft.com/winfx/2006/xaml"
15              xmlns:mc="http://schemas.openxmlformats.org/markup-compatibility/2006" 
16              xmlns:d="http://schemas.microsoft.com/expression/blend/2008" 
17              xmlns:local="clr-namespace:WPFUnloadedTriggerTest"
18              mc:Ignorable="d" 
19              d:DesignHeight="450" d:DesignWidth="800">
20     <local:UserControl2></local:UserControl2>
21 </UserControl>

View Code

2. 显示窗口后,修改DPI比例

3. 設置完后,會觸發Unloaded/Loaded重新加載

Unloaded的觸發順序是UserControl1–>UserControl2,Window並不會觸發Unloaded事件!

是不是詭異?我們繼續。。。

 4. Window我們添加一個ControlTemplate模塊

1     <Window.Template>
2         <ControlTemplate TargetType="Window">
3             <Border>
4                 <AdornerDecorator>
5                     <ContentPresenter />
6                 </AdornerDecorator>
7             </Border>
8         </ControlTemplate>
9     </Window.Template>

 再重複2、3步驟,Unloaded的觸發順序變了:

觸發UserControl2的Unloaded,Window、UserControl1並不會觸發Unloaded事件!

問題分析

第2步驟中修改DPI后,Unloaded事件不一定觸發。如何必現呢?

將窗口靠近到任務欄上方,再修改文本比例。

 我們查看調用堆棧,貌似是系統給窗口發送消息然後調用BroadcastUnloadedEvent事件,觸發Unload

 所以應該是修改DPI,窗口寬高超出了當前屏幕尺寸範圍,系統對UserControl的視覺樹進行重新加載布局。

至於窗口沒有觸發Unloaded、以及在窗口添加以上模塊後下一級子控件也沒有觸發Unloaded事件的原因,暫不了解

而對WPF-Unloaded/Loaded的已知情況如下:

  • FrameworkElement, 第一次加載显示時,會觸發Loaded。元素被釋放時,會觸發Unloaded。窗口Show/Close時,視覺樹變化都會觸發加載事件
  • MenuItem, 在FrameworkElement基礎上,每次和隱藏MenuItem時,會額外觸發Load/Unloaded
  • TabControl,當你選中一個tabItem時會觸發Loaded,當你取消選中一個tabItem時會觸發Unloaded,所以切換Tab時必定有一個Loaded一個Unloaded。
  • Expander,每次被Expanded擴展時會引發Loaded,但當隱藏時不會引發Unloaded。

 以上問題的解決方案?暫時沒有解決方案,只有規避措施,不要過於依賴於Unload/Loaded,而且使用了Unload/Loaded時也要添加註銷機制,防止重入

我在github提了個issue:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※公開收購3c價格,不怕被賤賣!

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

使用Docker搭建maven私服 及常規使用方法

安裝-登錄-配置

下載鏡像
docker pull sonatype/nexus3
運行
docker run -d -p 9998:8081 --name nexus --restart=always sonatype/nexus3

進入容器中查看密碼是多少

docker exec -it 容器名/容器id /bin/bash

根據上圖的提示進入到指定的目錄,查看密碼是啥

繼續訪問, 修改密碼

修改私服的中央倉庫位置,如果嫌國外的站點太慢了, 我們就將其修改成阿里雲,修改方式就是替換一下鏈接就ok

創建hosted類型的倉庫

選擇創建的倉庫類型是hosted類型,為什麼非得選擇這種類型呢? 如下錶中解密

項目 具體說明
hosted 本地存儲。像官方倉庫一樣提供本地私庫功能
proxy 提供代理其它倉庫的類型
group 組類型,能夠組合多個倉庫為一個地址提供服務

繼續創建

創建一個私服的帳號,然後在我的windows本中本地maven添加進去私服的新創建的這個用戶的信息, 進而可以使用這個用戶往私服中發布jar包

填寫用戶的信息

找到本機的settings.xml配置文件, 將我們剛剛創建的私服添加進去

ok, 下面去idea中發布jar包

發布

首先是將連接私服的用戶信息配置進配置文件

  1. id 就是上圖中的id
  2. url: 在nexus可視化界面中找到我們在上面創建的倉庫可以找到url

準備腳本

 <!--添加build依賴,表示可以發布jar-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-deploy-plugin</artifactId>
                <version>2.8</version>
            </plugin>
            <!--發布源碼的插件-->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <version>2.2.1</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

發布命令:

mvn deploy

踩坑

  • 再發布之前檢查一下idea中關於maven的配置,使用我們剛才修改的settings.xml配置文件 , 不然這就是個坑,會一直deploy失敗
  • 上面的版本一定得和我們創建的倉庫的類型對應起來, 否則會報錯失敗

發布成果后我們繼續查看結果, 可

詳細結果

拉取使用

添加如下的在pom文件中依賴就ok

<dependency>
  <groupId>com.changwu</groupId>
  <artifactId>lawyer-eureka</artifactId>
  <version>1.0-RELEASE</version>
</dependency>
 <repository>
     <id>changwu</id>
     <name>lawyer-lover-release</name>
     <url>http://139.x.xx.235:9998/repository/lawyer-lover-release/</url>
</repository>

歡迎關注我的博客, 我將會把整理的docker(從入門到部署微服務)分享全套筆記

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

軟件開發要質量還是要效率?

質量和效率似乎永遠都是一對冤家,儘管我們都希望既有質量,又有效率。

把“質量”當做宗旨的企業,通常都有一系列的規章制度,甚至是繁重且冗餘的流程用來約束軟件開發過程中種種“有意”或“無意”的威脅軟件質量的行為。

把“效率”當做宗旨的企業,通常其內部並無嚴格的規章制度,甚至寬鬆到一個人都可以輕鬆地完成從刪庫到跑路。

從事IT行業的相關人員大多知道,軟件開發不同於一般性的勞動,它並不能單純地增加人手就能縮減開發周期,也就是說一個軟件1個人開發需要10天,這並不意味着10個人就可以1天開發完成。並且在軟件開發的過程中,由於需要“適應市場的快速發展”,常常伴隨需求變更等不可預知的問題。也就是在前期所做的工作可能因為某個需求而全部推倒重來。

下面從要質量還是要效率兩個方面來闡述,不同的側重點所帶來的的問題。

我們首先假設,公理P1:作為IT行業的從業者(開發、測試、產品等)都知道,軟件開發具有一定的不可預知性

那麼在這個前提下,傾向於“質量”的企業通常情況下有以下做法:

  • 通過規章制度讓軟件開發具有一定的可預知性

讓軟件開發具有一定的可預知性,這種方式有很多種實現,比較常見的手段是讓需求變更的成本上升。一旦進入開發階段(含設計階段),需求不得隨意變更,這種方式對開發人員相對比較友好,開發人員不再被隨意變更的需求所打擾,但同時也對產品經理提出了更多的要求。這要求產品經理需要有高超的業務能力,以及一定的前瞻性。除了讓需求變更的成本上升以外,通常也會在前期做大量的工作,包括需求評審、文檔設計、設計評審等會議,在軟件開發的中後期不斷地進行代碼評審等工作。這一系列的規章制度流程,能使得軟件開發不再隨心所欲,而是有章可循。顯而易見,這樣“傳統”的開發形式,勢必帶來效率的下降。例如我曾經見過有的公司,一年最多發布2個版本。這在如今快速的互聯網發展中是不可接受的。

而傾向於“效率”的企業,也就是通常所說的互聯網公司對於效率的提升通常採取以下手段:

  • 通過縮短開發周期使軟件開發具有一定的可預知性

目前在部分互聯網公司所倡導的“敏捷開發”實際上就是通過縮短開發周期來使軟件具有一定的可預知性。我們在開頭假設了了公理P1,軟件開發具有一定的不可預知性。並且開發周期越長,不可預知性越大。注重質量的公司,可能更傾向於提高需求變更的成本,而注重效率的公司則縮短開發周期。兩者都是為了使得軟件開發變得可控。但兩個不同的方式則導致了兩個不同的傾向。

縮短開發周期的確會讓效率變得更高,起碼能更快的適應市場的需求。那為什麼會說縮短開發周期會使得質量降低呢?

其實這是一個顯而易見的道理,縮短開發周期,理論上來講似乎就能縮短開發時間。10個需求需要做10天,平均1個需求不就只需要1天嗎?那麼我為了提高我的效率,快速響應市場變化,我就採取敏捷開發的方式,這樣不就既滿足了效率,同時也滿足了開發時間,這樣的做法似乎並不會降低軟件開發的質量。這麼想的通常是沒有從事過技術研發的同學。仍然回到公理P1,軟件開發具有一定的不可預知性。我在做當前開發的時候,所採取的的設計基本上只適用於當前的業務模型,對於未來幾乎一無所知。隨着系統不斷地快速迭代,一次又一次的在原有的系統上疊加新的功能修改刪除舊的功能。這對於軟件開發者可以說是災難性的,沒有哪一個系統架構師能遇見未來的所有可能。“天下武功唯快不破”,快是快了,代碼後院也快起火了。

天底下沒有公司敢說我不注重質量,我只注重效率。無論是什麼公司都會採取以下手段去保證軟件質量。

  • 通過一定的經濟利益懲罰手段

一定的懲罰手段,簡單粗暴地將開發人員的bug數與績效掛鈎。不過直接將bug數與績效掛鈎的情況比較少,大多情況是bug的reopen次數,以及是否有新引入的bug。其中reopen是較為常見的一種懲罰手段,同樣也能較好地推動軟件質量提升。

事實上,並沒有哪一種絕對完美的兼顧了質量和效率,對於目前的互聯網公司大多所採用的是快速迭代的開發方式。但這並不代表採用這種方式的公司質量就一定低下。

“快速適應市場的變化”這本身也是一種需求,採取快速迭代的方式實際上也是為了滿足這一“需求”。阿里巴巴集團CTO行癲曾談到過,“最早,業務比技術跑的快,技術一直追業務,因為業務增長實在太快了。前兩年我覺得是技術推動業務,特別是人工智能興起的之後,包括我們程序化交易、廣告平台、千人千面、推薦、搜索大量用算法和AI,包括客服等等大量用數據智能在驅動業務”。

“業務比技術跑得快”,這意味着一定一個快速迭代的過程。而後來“技術推動業務”,意味着技術走在了業務的前面,反倒是技術追着業務打。這其中儘管並未提及質量,但我認為技術能推動業務不斷向前跑,一定是因為有堅實的技術後盾做支撐,而堅實的技術後盾也就意味着有超高的軟件質量

所以,在質量與效率的權衡利弊平衡中,不妨回過頭來重新審視技術的重要性。在滿足“市場快速變化”這一需求的同時,不要忘記技術也會負債,欠得越多越不牢靠。

這是一個能給程序員加buff的公眾號 (CoderBuff)

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※高價收購3C產品,價格不怕你比較

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

3c收購,鏡頭 收購有可能以全新價回收嗎?

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

HBase 基本入門篇

目錄

無論是 NoSQL,還是大數據領域,HBase 都是非常”炙熱”的一門數據庫。
本文將對 HBase 做一些基礎性的介紹,旨在入門。

一、簡介

HBase 是一個開源的、面向列的非關係型分佈式數據庫,目前是Hadoop體系中非常關鍵的一部分。
在最初,HBase是基於谷歌的 BigTable 原型實現的,許多技術來自於Fay Chang在2006年所撰寫的Google論文”BigTable”。與 BigTable基於Google文件系統(File System)一樣,HBase則是基於HDFS(Hadoop的分佈式文件系統)之上而開發的。

HBase 採用 Java 語言實現,在其內部實現了BigTable論文提到的一些壓縮算法、內存操作和布隆過濾器等,這些能力使得HBase 在海量數據存儲、高性能讀寫場景中得到了大量應用,如 Facebook 在 2010年11 月開始便一直選用 HBase來作為消息平台的存儲層技術。
HBase 以 Apache License Version 2.0開源,這是一種對商業應用友好的協議,同時該項目當前也是Apache軟件基金會的頂級項目之一。

有什麼特性

  • 基於列式存儲模型,對於數據實現了高度壓縮,節省存儲成本
  • 採用 LSM 機制而不是B(+)樹,這使得HBase非常適合海量數據實時寫入的場景
  • 高可靠,一個數據會包含多個副本(默認是3副本),這得益於HDFS的複製能力,由RegionServer提供自動故障轉移的功能
  • 高擴展,支持分片擴展能力(基於Region),可實現自動、數據均衡
  • 強一致性讀寫,數據的讀寫都針對主Region上進行,屬於CP型的系統
  • 易操作,HBase提供了Java API、RestAPI/Thrift API等接口
  • 查詢優化,採用Block Cache 和 布隆過濾器來支持海量數據的快速查找

與RDBMS的區別

對於傳統 RDBMS 來說,支持 ACID 事務是數據庫的基本能力,而 HBase 則使用行級鎖來保證寫操作的原子性,但是不支持多行寫操作的事務性,這主要是從靈活性和擴展性上做出的權衡。

ACID 要素包含 原子性(Atomicity)、一致性(Consistency)、隔離性(Isolation)以及持久性(Durability)

總體來說, HBase 與傳統關係數據庫的區別,如下錶所示:

特性 HBase RDBMS
硬件架構 類似於 Hadoop 的分佈式集群,硬件成本低廉 傳統的多核系統,硬件成本昂貴
容錯性 由軟件架構實現,由於由多個節點組成,所以不擔心一點或幾點宕機 一般需要額外硬件設備實現 HA 機制
數據庫大小 PB GB、TB
數據排布方式 稀疏的、分佈的多維的 Map 以行和列組織
數據類型 Bytes 豐富的數據類型
事物支持 ACID 只支持單個 Row 級別 全面的 ACID 支持,對 Row 和表
查詢語言 只支持 Java API (除非與其他框架一起使用,如 Phoenix、Hive) SQL
索引 只支持 Row-key,除非與其他技術一起應用,如 Phoenix、Hive 支持
吞吐量 百萬查詢/每秒 數千查詢/每秒

二、數據模型

下面,我們以關係型數據庫的一個數據表來演示 HBase 的不同之處。 先來看下面這張表:

ID 設備名 狀態 時間戳
1 空調 打開 20190712 10:05:01
2 電視機 關閉 20190712 10:05:08

這裏記錄的是一些家庭設備上報的狀態數據(DeviceState),其中包括設備名、狀態、時間戳這些字段。

在 HBase 中,數據是按照列族(Column Family,簡稱CF)來存儲的,也就是說對於不同的列會被分開存儲到不同的文件。
那麼對於上面的狀態數據表來說,在HBase中會被存儲為兩份:

列族1. 設備名

Row-Key CF:Column-Key Timestamp Cell Value
1 DeviceState:設備名 20190712 10:05:01 空調
2 DeviceState:設備名 20190712 10:05:08 電視機

列族2. 狀態

Row-Key CF:Column-Key Timestamp Cell Value
1 DeviceState:狀態 20190712 10:05:01 打開
2 DeviceState:狀態 20190712 10:05:08 關閉

這裏Row-key是唯一定位數據行的ID字段,而Row-key 加上 CF、Column-Key,再加上一個時間戳才可以定位到一個單元格數據。
其中時間戳用來表示數據行的版本, 在HBase中默認會有 3 個時間戳的版本數據,這意味着對同一條數據(同一個Rowkey關聯的數據)進行寫入時,最多可以保存3個版本。

在查詢某一行的數據時,HBase需要同時從兩個列族(文件)中進行查找,最終將結果合併后返回給客戶端。 由此可見如果列族太多,則會影響讀取的性能,在設計時就需要做一些權衡。

由此可見,HBase的使用方式與關係型數據庫是大不相同的,在使用 HBase 時需要拋棄許多關係型數據庫的思維及做法,比如強類型、二級索引、表連接、觸發器等等。

然而 HBase 的靈活性及高度可伸縮性卻是傳統 RDBMS 無法比擬的。

三、安裝HBase

單機環境安裝

  1. 準備JDK環境

確保環境上JDK已經裝好,可執行java -version確認:

host:/home/hbase # java -version
openjdk version "1.8.0_201"
OpenJDK Runtime Environment (build 1.8.0_201-Huawei_JDK_V100R001C00SPC060B003-b10)
OpenJDK 64-Bit Server VM (build 25.201-b10, mixed mode)
  1. 下載軟件

官網的下載地址頁面:

選擇合適的版本,比如1.4.10。 下載后解壓:

wget http://archive.apache.org/dist/hbase/2.1.5/hbase-2.1.5-bin.tar.gz
tar -xzvf hbase-2.1.5-bin.tar.gz
mkdir -p /opt/local
mv hbase-2.1.5 /opt/local/hbase

配置HBase執行命令路徑:

export HBASE_HOME=/opt/local/hbase
export PATH=$PATH:$HBASE_HOME/bin
  1. 配置軟件

vim conf/hbase-env.sh

#JDK安裝目錄
export JAVA_HOME=/usr/local/jre1.8.0_201
#配置hbase自己管理zookeeper
export HBASE_MANAGES_ZK=true

vim conf/hbase-site.xml

<configuration>

  <!-- zookeeper端口  -->
  <property>
      <name>hbase.zookeeper.property.clientPort</name>
      <value>2182</value>                                                                                                                                           
  </property>

  <!--  HBase 數據存儲目錄 -->
  <property>
    <name>hbase.rootdir</name>
    <value>file:///opt/local/hbase/data</value>
  </property>

  <!-- 用於指定 ZooKeeper 數據存儲目錄 -->
  <property>
    <name>hbase.zookeeper.property.dataDir</name>
    <value>/opt/local/hbase/data/zookeeper</value>
  </property>

  <!-- 用於指定臨時數據存儲目錄 -->
  <property>
    <name>hbase.tmp.dir</name>
    <value>/opt/local/hbase/temp/hbase-${user.name}</value>
  </property>
</configuration>

其中 hbase.rootdir 和 hbase.zookeeper.property.dataDir 都用來指定數據存放的目錄,默認情況下hbase會使用/tmp目錄,這顯然是不合適的。
配置了這兩個路徑之後,hbase會自動創建相應的目錄。

關於更多的參數設定可

  1. 啟動軟件
start-hbase.sh

此時查看 logs/hbase-root-master-host-xxx.log,如下:

2019-07-11 07:37:23,654 INFO  [localhost:33539.activeMasterManager] hbase.MetaMigrationConvertingToPB: hbase:meta doesn't have any entries to update.
2019-07-11 07:37:23,654 INFO  [localhost:33539.activeMasterManager] hbase.MetaMigrationConvertingToPB: META already up-to date with PB serialization
2019-07-11 07:37:23,664 INFO  [localhost:33539.activeMasterManager] master.AssignmentManager: Clean cluster startup. Assigning user regions
2019-07-11 07:37:23,665 INFO  [localhost:33539.activeMasterManager] master.AssignmentManager: Joined the cluster in 11ms, failover=false
2019-07-11 07:37:23,672 INFO  [localhost:33539.activeMasterManager] master.TableNamespaceManager: Namespace table not found. Creating...

檢查進程情況,發現進程已經啟動

ps -ef |grep hadoop
root     11049 11032  2 07:37 pts/1    00:00:20 /usr/local/jre1.8.0_201/bin/java -Dproc_master -XX:OnOutOfMemoryError=kill -9 %p -XX:+UseConcMarkSweepGC -XX:PermSize=128m -XX:MaxPermSize=128m -XX:ReservedCodeCacheSize=256m -Dhbase.log.dir=/opt/local/hbase/logs -Dhbase.log.file=hbase-root-master-host-192-168-138-148.log -Dhbase.home.dir=/opt/local/hbase -Dhbase.id.str=root -Dhbase.root.logger=INFO,RFA -Dhbase.security.logger=INFO,RFAS org.apache.hadoop.hbase.master.HMaster start
root     18907 30747  0 07:50 pts/1    00:00:00 grep --color=auto hadoop

通過JPS(JDK自帶的檢查工具) 可以看到當前啟動的Java進程:

# jps
5701 Jps
4826 HMaster
1311 jar

查看 data目錄,發現生成了對應的文件:

host:/opt/local/hbase/data # ls -lh .
total 36K
drwx------. 4 root root 4.0K Jul 11 08:08 data
drwx------. 4 root root 4.0K Jul 11 08:08 hbase
-rw-r--r--. 1 root root   42 Jul 11 08:08 hbase.id
-rw-r--r--. 1 root root    7 Jul 11 08:08 hbase.version
drwx------. 2 root root 4.0K Jul 11 08:08 MasterProcWALs
drwx------. 2 root root 4.0K Jul 11 08:08 oldWALs
drwx------. 3 root root 4.0K Jul 11 08:08 .tmp
drwx------. 3 root root 4.0K Jul 11 08:08 WALs
drwx------. 3 root root 4.0K Jul 11 08:08 zookeeper

關於運行模式
HBase啟動時默認會使用單機模式,此時 Zookeeper和 HMaster/RegionServer 會運行在同一個JVM中。
以standalone模式啟動的HBase會包含一個HMaster、RegionServer、Zookeeper實例,此時 HBase 會直接使用本地文件系統而不是HDFS。

通過將 conf/hbase-site.xml中的 hbase.cluster.distributed 配置為true,就是集群模式了。
在這個模式下,你可以使用分佈式環境進行部署,或者是”偽分佈式”的多進程環境。

<configuration>
  <property>
    <name>hbase.cluster.distributed</name>
    <value>true</value>
  </property>
</configuration>

需要注意的是,如果以standalone啟動的話,HMaster、RegionServer端口都是隨機的,無法通過配置文件指定。

四、基本使用

打開HBase Shell

hbase shell

執行status命令

Version 2.1.5, r76ab087819fe82ccf6f531096e18ad1bed079651, Wed Jun  5 16:48:11 PDT 2019

hbase(main):001:0> status
1 active master, 0 backup masters, 1 servers, 0 dead, 2.0000 average load

這表示有一個Master在運行,一個RegionServer,每個RegionServer包含2個Region。

表操作

  • 創建DeviceState表
hbase(main):002:0> create "DeviceState", "name:c1", "state:c2"

=> Hbase::Table - DeviceState

此時,已經創建了一個DeviceState表,包含name(設備名稱)、state(狀態)兩個列。

查看錶信息:

hbase(main):003:0> list
TABLE
DeviceState
1 row(s) in 0.0090 seconds

=> ["DeviceState"]

hbase(main):003:0> describe "DeviceState"
Table DeviceState is ENABLED
DeviceState
COLUMN FAMILIES DESCRIPTION
{NAME => 'name', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSIO
N => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
{NAME => 'state', BLOOMFILTER => 'ROW', VERSIONS => '1', IN_MEMORY => 'false', KEEP_DELETED_CELLS => 'FALSE', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', COMPRESSI
ON => 'NONE', MIN_VERSIONS => '0', BLOCKCACHE => 'true', BLOCKSIZE => '65536', REPLICATION_SCOPE => '0'}
2 row(s) in 0.0870 seconds
  • 寫入數據

通過下面的命令,向DeviceState寫入兩條記錄,由於有兩個列族,因此需要寫入四個單元格數據:

put "DeviceState", "row1", "name", "空調"
put "DeviceState", "row1", "state", "打開"
put "DeviceState", "row2", "name", "電視機"
put "DeviceState", "row2", "state", "關閉"
  • 查詢數據

查詢某行、某列

hbase(main):012:0> get "DeviceState","row1"
COLUMN                                      CELL
 name:                                      timestamp=1562834473008, value=\xE7\x94\xB5\xE8\xA7\x86\xE6\x9C\xBA
 state:                                     timestamp=1562834474630, value=\xE5\x85\xB3\xE9\x97\xAD
1 row(s) in 0.0230 seconds

hbase(main):013:0> get "DeviceState","row1", "name"
COLUMN                                      CELL
 name:                                      timestamp=1562834473008, value=\xE7\x94\xB5\xE8\xA7\x86\xE6\x9C\xBA
1 row(s) in 0.0200 seconds

掃描表

hbase(main):026:0> scan "DeviceState"
ROW                                         COLUMN+CELL
 row1                                       column=name:, timestamp=1562834999374, value=\xE7\xA9\xBA\xE8\xB0\x83
 row1                                       column=state:, timestamp=1562834999421, value=\xE6\x89\x93\xE5\xBC\x80
 row2                                       column=name:, timestamp=1562834999452, value=\xE7\x94\xB5\xE8\xA7\x86\xE6\x9C\xBA
 row2                                       column=state:, timestamp=1562835001064, value=\xE5\x85\xB3\xE9\x97\xAD
2 row(s) in 0.0250 seconds

查詢數量

hbase(main):014:0> count "DeviceState"
2 row(s) in 0.0370 seconds

=> 1
  • 清除數據

刪除某列、某行

delete "DeviceState", "row1", "name"
0 row(s) in 0.0080 seconds

hbase(main):003:0> deleteall "DeviceState", "row2"
0 row(s) in 0.1290 seconds

清空整個表數據

hbase(main):021:0> truncate "DeviceState"
Truncating 'DeviceState' table (it may take a while):
 - Disabling table...
 - Truncating table...
0 row(s) in 3.5060 seconds

刪除表(需要先disable)

hbase(main):006:0> disable "DeviceState"
0 row(s) in 2.2690 seconds

hbase(main):007:0> drop "DeviceState"
0 row(s) in 1.2880 seconds

五、FAQ

  • 啟動時提示 ZK 端口監聽失敗:
    Could not start ZK at requested port of 2181. ZK was started at port: 2182. Aborting as clients (e.g. shell) will not be able to find this ZK quorum

原因
HBase需要啟動Zookeeper,而本地的2181端口已經被啟用(可能有其他Zookeeper實例)

解決辦法
conf/hbase-site.xml中修改hbase.zookeeper.property.clientPort的值,將其修改為2182,:

<configuration>
  <property>
      <name>hbase.zookeeper.property.clientPort</name>
      <value>2182</value>                                                                                                                                           
  </property>
</configuration>
  • 啟動HBase Shell時提示java.lang.UnsatisfiedLinkError

原因
在執行hbase shell期間,JRuby會在“java.io.tmpdir”路徑下創建一個臨時文件,該路徑的默認值為“/tmp”。如果為“/tmp”目錄設置NOEXEC權限,然後hbase shell會啟動失敗並拋出“java.lang.UnsatisfiedLinkError”錯誤。

解決辦法

  1. 取消/tmp的noexec權限(不推薦)
  2. 設置java.io.tmpdir變量,指向可用的路徑,編輯conf/hbase-env.sh文件:
export HBASE_TMP_DIR=/opt/local/hbase/temp
export HBASE_OPTS="-XX:+UseConcMarkSweepGC -Djava.io.tmpdir=$HBASE_TMP_DIR"

參考文檔

HBase 官方權威指南

HBase 單機模式搭建

HBase 深入淺出
較詳細介紹了HBase的由來以及特性,文中提供了HBase集群、存儲機制的一些簡介,非常適合入門閱讀

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※帶您來了解什麼是 USB CONNECTOR  ?

平板收購,iphone手機收購,二手筆電回收,二手iphone收購-全台皆可收購

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

※綠能、環保無空污,成為電動車最新代名詞,目前市場使用率逐漸普及化

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

PowerMock學習(六)之Mock Final的使用

Mock Final

mockfinal相對來說就比較簡單了,使用powermock來測試使用final修飾的method或class,比較簡單,接口調用部分,還是service調用dao。

對於接口及場景這裏就不細說了,特別簡單。

service層

具體代碼示例如下:

package com.rongrong.powermock.mockfinal;

/**
 * @author rongrong
 * @version 1.0
 * @date 2019/11/27 21:29
 */
public class StudentFinalService {

    private StudentFinalDao studentFinalDao;

    public StudentFinalService(StudentFinalDao studentFinalDao) {
        this.studentFinalDao = studentFinalDao;
    }

    public void createStudent(Student student) {
        studentFinalDao.isInsert(student);
    }
}

dao層

為了模擬測試,我在dao層的類加了一個final關鍵字進行修飾,也就是這個類不允許被繼承了。

具體代碼如下:

package com.rongrong.powermock.mockfinal;


/**
 * @author rongrong
 * @version 1.0
 * @date 2019/11/27 21:20
 */
final public class StudentFinalDao {

    public Boolean isInsert(Student student){
        throw new UnsupportedOperationException();
    }
}

進行單元測試

為了區分powermock與Easymock的區別,我們先採用EasyMock測試,這裏先忽略EasyMock的用法,有興趣的同學可自行去嘗試學習。

使用EasyMock進行測試

具體代碼示例如下:

    @Test
    public void testStudentFinalServiceWithEasyMock(){
        //mock對象
        StudentFinalDao studentFinalDao = EasyMock.createMock(StudentFinalDao.class);
        Student student = new Student();
        //mock調用,默認返回成功
        EasyMock.expect(studentFinalDao.isInsert(student)).andReturn(true);
        EasyMock.replay(studentFinalDao);
        StudentFinalService studentFinalService = new StudentFinalService(studentFinalDao);
        studentFinalService.createStudent(student);
        EasyMock.verify(studentFinalDao);
    }

我們先來運行下這個單元測試,會發現運行報錯,具體如下圖显示:

 

 很明顯由於有final關鍵字修飾后,導致不能讓測試成功,我們可以刪除final關鍵再來測試一下,結果發現,測試通過。

使用PowerMock進行測試

具體代碼示例如下:

package com.rongrong.powermock.mockfinal;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

/**
 * @author rongrong
 * @version 1.0
 * @date 2019/11/27 22:10
 */
@RunWith(PowerMockRunner.class)
@PrepareForTest(StudentFinalDao.class)
public class TestStudentFinalService {

    @Test
    public void testStudentFinalServiceWithPowerMock(){
        StudentFinalDao studentFinalDao = PowerMockito.mock(StudentFinalDao.class);
        Student student = new Student();
        PowerMockito.when(studentFinalDao.isInsert(student)).thenReturn(true);
        StudentFinalService studentFinalService = new StudentFinalService(studentFinalDao);
        studentFinalService.createStudent(student);
        Mockito.verify(studentFinalDao).isInsert(student);
    }
}

運行上面的單元測試時,會發現運行通過!!

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

※想要讓你的商品在網路上成為最夯、最多人討論的話題?

※高價收購3C產品,價格不怕你比較

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!

Java學習筆記 線程池使用及詳解

有點笨,參考了好幾篇大佬們寫的文章才整理出來的筆記….

字面意思上解釋,線程池就是裝有線程的池,我們可以把要執行的多線程交給線程池來處理,和連接池的概念一樣,通過維護一定數量的線程池來達到多個線程的復用。

好處

多線程產生的問題

一般我們使用到多線程的編程的時候,需要通過new Thread(xxRunnable).start()創建並開啟線程,我們可以使用多線程來達到最優效率(如多線程下載)。

但是,線程不是越多就越好,線程過多,創建和銷毀就會消耗系統的資源,也不方便管理。

除此之外,多線程還會造成併發問題,線程併發數量過多,搶佔系統資源從而導致阻塞。

線程池優點

我們將線程放入線程池,由線程池對線程進行管理,可以對線程池中緩衝的線程進行復用,這樣,就不會經常去創建和銷毀線程了,從而省下了系統的資源。

線程池能夠有效的控制線程併發的數量,能夠解決多線程造成的併發問題。

除此之外,線程池還能夠對線程進行一定的管理,如延時執行、定時循環執行的策略等

線程池實現

線程池的實現,主要是通過這個類ThreadPoolExecutor,其的構造參數非常長,我們先大概了解,之後再進行詳細的介紹。

public ThreadPoolExecutor(int corePoolSize,
    int maximumPoolSize,long keepAliveTime,
    TimeUnit unit,BlockingQueue workQueue,
    RejectedExecutionHandler handler)
  • corePoolSize:線程池核心線程數量
  • maximumPoolSize:線程池最大線程數量
  • keepAliverTime:當活躍線程數大於核心線程數時,空閑的多餘線程最大存活時間
  • unit:存活時間的單位
  • workQueue:存放線程的工作隊列
  • handler:超出線程範圍和隊列容量的任務的處理程序(拒絕策略)

這裏大概簡單說明一下線程池的運行流程:

當線程被添加到線程池中,如果線程池中的當前的線程數量等於線程池定義的最大核心線程數量(corePoolSize)了,此線程就會別放入線程的工作隊列(workQueue)中,等待線程池的調用。

Java提供了一個工具類Excutors,方便我們快速創建線程池,其底層也是調用了ThreadPoolExecutor

不過阿里巴巴Java規範中強制要求我們應該通過ThreadPoolExecutor來創建自己的線程池,使用Excutors容易造成OOM問題。

所以,我們先從Excutors開始學習,之後在對ThreadPoolExecutor進行詳細的講解

Excutors

由於Excutors是工具類,所以下面的介紹的都是其的靜態方法,如果是比較線程數目比較少的小項目,可以使用此工具類來創建線程池

PS:把線程提交給線程池中,有兩種方法,一種是submit,另外一種則是execute

兩者的區別:

  1. execute沒有返回值,如果不需要知道線程的結果就使用execute方法,性能會好很多。
  2. submit返回一個Future對象,如果想知道線程結果就使用submit提交,而且它能在主線程中通過Future的get方法捕獲線程中的異常

線程池可以接收兩種的參數,一個為Runnable對象,另外則是Callable對象

Callable是JDK1.5時加入的接口,作為Runnable的一種補充,允許有返回值,允許拋出異常。

主要的幾個靜態方法:

方法 說明
newFixedThreadPool(int nThreads) 創建固定大小的線程池
newSingleThreadExecutor() 創建只有一個線程的線程池
newCachedThreadPool() 創建一個不限線程數上限的線程池,任何提交的任務都將立即執行
newScheduledThreadPool(int nThreads) 創建一個支持定時、周期性或延時任務的限定線程數目的線程池
newSingleThreadScheduledExecutor() 創建一個支持定時、周期性或延時任務的單個線程的線程池

1.newSingleThreadExecutor

創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行,我們可以使用它來達到控制線程順序執行。

控制進程順序執行:

Thread thread1 = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("這是線程1");
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
Thread thread2 = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("這是線程2");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
Thread thread3 = new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            System.out.println("這是線程3");
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
});
//創建線程池對象
ExecutorService executorService = Executors.newSingleThreadExecutor();
//把線程添加到線程池中
executorService.submit(thread1);
executorService.submit(thread2);
executorService.submit(thread3);

之後出現的結果就是按照順序輸出

2.newFixedThreadPool

創建一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。定長線程池的大小最好根據系統資源進行設置。如Runtime.getRuntime().availableProcessors()

3.newCachedThreadPool

創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程,線程池為無限大,當執行第二個任務時第一個任務已經完成,會復用執行第一個任務的線程,而不用每次新建線程。

代碼:

//創建了一個自定義的線程
public class MyThread extends Thread {
    private int index;

    public MyThread(int index) {
        this.index = index;
    }

    @Override
    public void run() {
        System.out.println(index+" 當前線程"+Thread.currentThread().getName());
    }
}

//創建緩存線程池
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    executorService.execute(new MyThread(i));
    try {
        //這裏模擬等待時間,等待線程池復用回收線程
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

可以看到結果都是使用的同一個線程

4.newScheduledThreadPool

創建一個定長線程池,支持定時、周期性或延時任務執行

延遲1s后啟動線程:

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
scheduledExecutorService.schedule(new MyThread(1),1, TimeUnit.SECONDS);

ThreadPoolExecutor

構造方法

上面提到的那個構造方法其實只是ThreadPoolExecutor類中的一個,ThreadPoolExecutor類中存在有四種不同的構造方法,主要區別就是參數不同。

//五個參數的構造函數
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue)

//六個參數的構造函數-1
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory)

//六個參數的構造函數-2
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          RejectedExecutionHandler handler)

//七個參數的構造函數
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

首先,有個概念需要明白,線程池的最大線程數(線程總數,maximumPoolSize)= 核心線程數(corePoolSize)+非核心線程數

  • corePoolSize:線程池核心線程數量
  • maximumPoolSize:線程池最大線程數量
  • keepAliverTime:當活躍線程數大於核心線程數時,空閑的多餘線程最大存活時間
  • unit:存活時間的單位
  • workQueue:存放線程的工作隊列
  • handler:超出線程範圍和隊列容量的任務的處理程序(拒絕策略)

核心線程和非核心線程有什麼區別呢?

核心線程是永遠不會被線程池丟棄回收(即使核心線程沒有工作),非核心線程則是超過一定時間(keepAliverTime)則就會被丟棄

workQueue

當所有的核心線程都在工作時,新添加的任務會被添加到這個隊列中等待處理,如果隊列滿了,則新建非核心線程執行任務

1.SynchronousQueue:這個隊列接收到任務的時候,會直接提交給線程處理,而不保留它,如果所有線程都在工作怎麼辦?那就新建一個線程來處理這個任務!所以為了保證不出現線程數達到了maximumPoolSize而不能新建線程的錯誤,使用這個類型隊列的時候,maximumPoolSize一般指定成Integer.MAX_VALUE,即無限大

2.LinkedBlockingQueue:這個隊列接收到任務的時候,如果當前線程數小於核心線程數,則新建線程(核心線程)處理任務;如果當前線程數等於核心線程數,則進入隊列等待。由於這個隊列沒有最大值限制,即所有超過核心線程數的任務都將被添加到隊列中,這也就導致了maximumPoolSize的設定失效,因為總線程數永遠不會超過corePoolSize

3.ArrayBlockingQueue:可以限定隊列的長度,接收到任務的時候,如果沒有達到corePoolSize的值,則新建線程(核心線程)執行任務,如果達到了,則入隊等候,如果隊列已滿,則新建線程(非核心線程)執行任務,又如果總線程數到了maximumPoolSize,並且隊列也滿了,則發生錯誤

4.DelayQueue:隊列內元素必須實現Delayed接口,這就意味着你傳進去的任務必須先實現Delayed接口。這個隊列接收到任務時,首先先入隊,只有達到了指定的延時時間,才會執行任務

拒絕策略:

拒絕策略 拒絕行為
AbortPolicy 拋出RejectedExecutionException異常(默認)
DiscardPolicy 不處理,丟棄掉
DiscardOldestPolicy 丟棄執行隊列中等待最久的一個任務,嘗試為新來的任務騰出位置
CallerRunsPolicy 直接由提交任務者執行這個任務

兩種方法設置拒絕策略:

//ThreadPoolExecutor對象的setRejectedExecutionHandler方法設置
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 5, 60, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
//構造方法進行設置,省略

線程池默認的拒絕行為是AbortPolicy,也就是拋出RejectedExecutionHandler異常,該異常是非受檢異常,很容易忘記捕獲。

如果不關心任務被拒絕的事件,可以將拒絕策略設置成DiscardPolicy,這樣多餘的任務會悄悄的被忽略。

ThreadFactory

一個接口類,用來對線程進行設置,需要實現newThread(Runnable r)方法

官方的文檔說明:

newThread此方法一般來初始化線程的優先級(priority),名字(name),守護進程(daemon)或線程組(ThreadGroup)

簡單的例子(讓某個類實現ThreadFactory接口):

@Override
public Thread newThread(Runnable r) {
    Thread thread = new Thread(r);
    thread.setDaemon(true);
    return thread;
}

線程池獲取執行結果

PS:把線程提交給線程池中,有兩種方法,一種是submit,另外一種則是execute

兩者的區別:

  1. execute沒有返回值,如果不需要知道線程的結果就使用execute方法,性能會好很多。
  2. submit返回一個Future對象,如果想知道線程結果就使用submit提交,而且它能在主線程中通過Future的get方法捕獲線程中的異常

線程池可以接收兩種的參數,一個為Runnable對象,另外則是Callable對象

Callable是JDK1.5時加入的接口,作為Runnable的一種補充,允許有返回值,允許拋出異常。

線程池的處理結果、以及處理過程中的異常都被包裝到Future中,並在調用Future.get()方法時獲取,執行過程中的異常會被包裝成ExecutionException,submit()方法本身不會傳遞結果和任務執行過程中的異常。

獲取執行結果的代碼可以這樣寫:

ExecutorService executorService = Executors.newFixedThreadPool(4);
Future<Object> future = executorService.submit(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            //該異常會在調用Future.get()時傳遞給調用者
            throw new RuntimeException("exception in call~");
        }
    });
    
try {
    //獲得返回結果
    Object result = future.get();
    
    
} catch (InterruptedException e) {
  // interrupt
} catch (ExecutionException e) {
  // exception in Callable.call()
  e.printStackTrace();
}

線程池運行流程

一個形象的比喻說明線程池的流程:

規定:

  1. 線程池比作成一家公司
  2. 公司的最大員工數為maximumPoolSize
  3. 最大正式員工數為coolPoolSize(核心線程的總數)
  4. 最大員工數(maximumPoolSize) = 最大正式員工(coolPoolSize)和臨時工(非核心線程)
  5. 單子(任務)可看做為線程
  6. 隊列使用的是ArrayBlockingQueue
  7. 一個員工只能幹一個任務

最開始的時候,公司是沒有一名員工。之後,公司接到了單子(任務),這個時候,公司才去找員工(創建核心線程並讓線程開始執行),這個時候找到的員工就是正式員工了。

公司的聲譽越來越好,於是來了更多的單子,公司繼續招人,直到正式員工數量達到最大的正式員工的數量(核心線程數量已達到最大)

於是,多出來的單子就暫時地存放在了隊列中,都在排隊,等待正式員工們把手頭的工作做完之後,就從隊列中依次取出單子繼續工作。

某天,來了一個新單子,但是這個時候隊列已經滿了,公司為了自己的信譽和聲譽着想,不得已只能去找臨時工(創建非核心線程)來幫忙開始進行工作(負責新單子)

在此之後,又來了新單子,公司繼續去招臨時工為新來的單子工作,直到正式工和臨時工的數量已經達到了公司最大員工數。

這個時候,公司沒有辦法了,只能拒絕新來的單子了(拒絕策略)

此時,正式工和臨時工都是在加班加點去從隊列中取出任務來工作,終於某一天,隊列的已經沒有單子了,市場發展不好,單子越來越少,臨時工很久都不工作了(非核心線程超過了最大存活時間keepAliveTime),公司就把這些臨時工解僱了,直到剩下只有正式員工。

PS:如果也想要解僱正式員工(銷毀核心線程),可以設置ThreadPoolExecutor對象的的allowCoreThreadTimeOut這個屬性為true

個人理解,可能不是很正確,僅供參考!

線程池關閉

方法 說明
shutdown() 不再接受新的任務,之前提交的任務等執行結束再關閉線程池
shutdownNow() 不再接受新的任務,試圖停止池中的任務再關閉線程池,返回所有未處理的線程list列表。

總結

如果是小的Java程序,可以使用Excutors,如果是服務器程序,則使用ThreadPoolExecutor進行自定義線程池的創建

參考鏈接:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

USB CONNECTOR 掌控什麼技術要點? 帶您認識其相關發展及效能

※高價3c回收,收購空拍機,收購鏡頭,收購 MACBOOK-更多收購平台討論專區

※評比前十大台北網頁設計台北網站設計公司知名案例作品心得分享

收購3c瘋!各款手機、筆電、相機、平板,歡迎來詢價!

※智慧手機時代的來臨,RWD網頁設計已成為網頁設計推薦首選

GDG Xi’an DevFest 2019 閃電演講 -《假如我是一個瀏覽器》PPT(經典多圖,建議收藏)

GDG Xi’an DevFest2019演講PPT鏈接:

閃電演講《假如我是一個瀏覽器》PPT鏈接:

關於我的一篇雞湯文,獻給所有努力中的野生前端:

摘要

內容講述了HTML,CSS和JavaScript文件從代碼到瀏覽器中圖形的基本過程,實際上每個階段正好代表了高級前端工程師可以選擇的三大細分方向——架構師,工程化,圖形學。PPT基本上全是圖,引用了一些知名的前端神圖,大部分都是自己一點點做的,畢竟圖的表現力比文字要生動直觀一些,原稿幾乎每一頁都加了備註!!!,有需要的可以在我的博客或者GDG西安官方公眾號獲取到。

作者簡介

請求階段

請求階段從解析DNS開始,它是一個遞歸的過程,可以在Linux系統中使用dig+trace工具進行追蹤查看;查詢到地址后就需要開始建立連接(三次握手建立連接),然後從服務器獲取第一個文件,通常是index.html,獲取到文件后就需要根據響應頭裡的信息進行一些處理,對這塊不太熟悉的同學可以閱讀《圖解Http》一書,強制緩存和協商緩存這一塊是很重要的考點,index.html在解析時可能還會碰到請求其他資源的情況,這時又會引出CDN等等其他話題,本次分享中並未涉及。如果對於前端可用性及資源部署方面感興趣,可以考慮向架構師的方向發展,也就是只將前端應用視為整個鏈路中的一環,嘗試去關注整個鏈路中各個環節,前端工程師切入時並不需要特別關注去解決細枝末節的技術問題,那畢竟需要時間和經驗的積累,請記住你是有夥伴的,我個人比較推薦前端工程師嘗試建設全鏈路的異常監控體系,去了解各個環節有哪些關鍵指標,如何去呈現,如何去判斷異常等等,以盡可能穩定有效的方式把關鍵信息呈現給能解決問題的人。

解析階段

我在分享時已經提及過,解析階段的關鍵詞就是“編譯原理”,前端基礎的HTML,CSS,JS,以及常見的工程化工具例如Webpack,Babel,Eslint等等,全部都是基於編譯原理來運作的,如果從純學術的角度來看,它的確很晦澀,但是從應用的角度來理解,實際上無論是分詞,轉換還是遍歷AST以及最終的代碼生成,實際上都是看得見摸得着的,並不算特別難理解,B站上有很多國內外的《編譯原理》課程錄像,你懂的(B站真的是學習用的)。其中還涉及到了一些基本的數據結構和基礎算法的知識,這裏的知識是對基本功的硬考驗,也就是“設計模式”“數據結構”和“基礎算法”的三座大山,爬山很慢,但真的很值。這一塊的知識可以翻看朱永盛的《Webkit技術內幕》一書,慎重,沒有老司機帶的話這本書很容易看的人懷疑人生。

種樹階段

種樹階段只是戲稱,就是為了不同的目的構建了許許多多的樹和層。HTML解析後生成DOM樹,它表示文檔的結構,CSS在內部優化時也會生成樹,為了將用於渲染的信息整合在一起,兩者被合併生成了RenderObject樹,為了解決層疊順序問題,又在此基礎上生成了RenderLayer層,為了利用硬件加速渲染,又為滿足另一些條件的層生成CompositingLayer合成層,合成層又使用GraphicsLayer來進行後端存儲。概念之多,相對複雜。為了排除干擾,本次分享中並沒有講述Chrome瀏覽器的多進程模型和多線程結構,它們只是為了更好更高效地處理好關鍵渲染步驟,一次性信息量太大反而會影響吸收。

畫畫階段

畫畫階段實際上是指將對象信息通過光柵化處理后得到位圖信息並展示在显示器上的過程,PPT中並沒有涉及,它涉及到很多圖形學相關的知識,基本的WebGL以及Chromium渲染管線方面的知識。對此感興趣的讀者可以掃描下面的二維碼關注我技術博客中系列博文,比較詳細地描述了這部分相關知識。最後提一下,原稿最後一頁的資料在播放模式下都可以直接點擊跳轉,還有每一頁的備註信息如果看不見可能需要手動把畫面向上拖拽縮小一點。

硬廣時間

我的博文集《大史住在大前端》是關於前端基礎的文章,掃下面右邊的二維碼就可以看到,基本都是系列專題,沒有太多關於三大框架或是熱門技術的東西,都是基礎基礎基礎,或許會對你有幫助。最後再次感謝GDGXi’an提供的這次機會,讓我認識了好多好多優秀的大佬和開發者。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

3c收購,鏡頭 收購有可能以全新價回收嗎?

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

賣IPHONE,iPhone回收,舊換新!教你怎麼賣才划算?

如何使用C#調用C++類虛函數(即動態內存調用)

  本文講解如何使用C#調用只有.h頭文件的c++類的虛函數(非實例函數,因為非虛函數不存在於虛函數表,無法通過類對象偏移計算地址,除非用export導出,而gcc默認是全部導出實例函數,這也是為什麼msvc需要.lib,如果你不清楚但希望了解,可以選擇找我擺龍門陣),並以COM組件的c#直接調用(不需要引用生成introp.dll)舉例。

  我們都知道,C#支持調用非託管函數,使用P/Inovke即可方便實現,例如下面的代碼

[DllImport("msvcrt", EntryPoint = "memcpy", CallingConvention = CallingConvention.Cdecl)]
public static extern void memcpy(IntPtr dest, IntPtr src, int count);

不過使用DllImport只能調用某個DLL中標記為導出的函數,我們可以使用一些工具查看函數導出,如下圖

一般會導出的函數,都是c語言格式的。

  C++類因為有多態,所以內存中維護了一個虛函數表,如果我們知道了某個C++類的內存地址,也有它的頭文件,那麼我們就能自己算出想要調用的某個函數的內存地址從而直接call,下面是一個簡單示例

#include <iostream>

class A_A_A {
public:
    virtual void hello() {
        std::cout << "hello from A\n";
    };
};

//typedef void (*HelloMethod)(void*);

int main()
{
    A_A_A* a = new A_A_A();
    a->hello();

    //HelloMethod helloMthd = *(HelloMethod *)*(void**)a;
    
    //helloMthd(a);
    (*(void(**)(void*))*(void**)a)(a);

    int c;
    std::cin >> c;
}

(上文中將第23行註釋掉,然後將其他註釋行打開也是一樣的效果,可能更便於閱讀)
從代碼中大家很容易看出,c++的類的內存結構是一個虛函數表二級指針(數組,多重繼承時可能有多個),每個虛函數表又是一個函數二級指針(數組,多少個虛函數就有多少個指針)。上文中我們假使只知道a是一個類對象,它的第一個虛函數是void (*) (void)類型的,那麼我們可以直接call它的函數。

  接下來開始騷操作,我們嘗試用c#來調用一個c++的虛函數,首先寫一個c++的dll,並且我們提供一個c格式的導出函數用於提供一個new出的對象(畢竟c++的new操作符很複雜,而且實際中我們經常是可以拿到這個new出來的對象,後面的com組件調用部分我會詳細說明),像下面這樣

dll.h

class DummyClass {
private:
    virtual void sayHello();
};

dll.cpp

#include "dll.h"
#include <stdio.h>

void DummyClass::sayHello() {
    printf("Hello World\n");
}

extern "C" __declspec(dllexport) DummyClass* __stdcall newObj() {
    return new DummyClass();
}

我們編譯出的dll長這樣

讓我們編寫使用C#來調用sayHello

using System;
using System.Runtime.InteropServices;

namespace ConsoleApp2
{
    class Program
    {
        [DllImport("Dll1", EntryPoint = "newObj")]
        static extern IntPtr CreateObject();

        [UnmanagedFunctionPointer(CallingConvention.ThisCall)]
        delegate void voidMethod1(IntPtr thisPtr);

        static void Main(string[] args)
        {
            IntPtr dummyClass = CreateObject();
            IntPtr vfptr = Marshal.ReadIntPtr(dummyClass);
            IntPtr funcPtr = Marshal.ReadIntPtr(vfptr);
            voidMethod1 voidMethod = (voidMethod1)Marshal.GetDelegateForFunctionPointer(funcPtr, typeof(voidMethod1));
            voidMethod(dummyClass);

            Console.ReadKey();
        }
    }
}

(因為調用的是c++的函數,所以this指針是第一個參數,當然,不同調用約定時它入棧方式和順序不一樣)
下面有一種另外的寫法

using System;
using System.Reflection;
using System.Reflection.Emit;
using System.Runtime.InteropServices;

namespace ConsoleApp2
{
    class Program
    {
        [DllImport("Dll1", EntryPoint = "newObj")]
        static extern IntPtr CreateObject();

        //[UnmanagedFunctionPointer(CallingConvention.ThisCall)]
        //delegate void voidMethod1(IntPtr thisPtr);

        static void Main(string[] args)
        {
            IntPtr dummyClass = CreateObject();
            IntPtr vfptr = Marshal.ReadIntPtr(dummyClass);
            IntPtr funcPtr = Marshal.ReadIntPtr(vfptr);
            /*voidMethod1 voidMethod = (voidMethod1)Marshal.GetDelegateForFunctionPointer(funcPtr, typeof(voidMethod1));
            voidMethod(dummyClass);*/

            AssemblyName MyAssemblyName = new AssemblyName();
            MyAssemblyName.Name = "DummyAssembly";
            AssemblyBuilder MyAssemblyBuilder = AppDomain.CurrentDomain.DefineDynamicAssembly(MyAssemblyName, AssemblyBuilderAccess.Run);
            ModuleBuilder MyModuleBuilder = MyAssemblyBuilder.DefineDynamicModule("DummyModule");
            MethodBuilder MyMethodBuilder = MyModuleBuilder.DefineGlobalMethod("DummyFunc", MethodAttributes.Public | MethodAttributes.Static, typeof(void), new Type[] { typeof(int) });
            ILGenerator IL = MyMethodBuilder.GetILGenerator();

            IL.Emit(OpCodes.Ldarg, 0);
            IL.Emit(OpCodes.Ldc_I4, funcPtr.ToInt32());

            IL.EmitCalli(OpCodes.Calli, CallingConvention.ThisCall, typeof(void), new Type[] { typeof(int) });
            IL.Emit(OpCodes.Ret);

            MyModuleBuilder.CreateGlobalFunctions();

            MethodInfo MyMethodInfo = MyModuleBuilder.GetMethod("DummyFunc");

            MyMethodInfo.Invoke(null, new object[] { dummyClass.ToInt32() });

            Console.ReadKey();
        }
    }
}

上文中的方法雖然複雜了一點,但……就是沒什麼用。不用懷疑!

文章寫到這裏,可能有童鞋就要發問了。你說這麼多,tmd到底有啥用?那接下來,我舉一個栗子,activex組件的直接調用!
以前,我們調用activex組件需要做很多複雜的事情,首先需要使用命令行調用regsvr32將dll註冊到系統,然後回到vs去引用com組件是吧

  仔細想想,需要嗎?並不需要,因為兩個原因:

  • COM組件規定DLL需要給出一個DllGetClassObject函數,它就可以為我們在DLL內部new一個所需對象
  • COM組件返回的對象其實就是一個只有虛函數的C++類對象(COM組件規定屬性和事件用getter/setter方式實現)
  • COM組件其實不需要用戶手動註冊,執行regsvr32會操作註冊表,而且32位/64位會混淆,其實regsvr32隻是調用了DLL導出函數DllRegisterServer,而這個函數的實現一般只是把自己註冊到註冊表中,這一步可有可無(特別是對於我們已經知道某個activex的dll存在路徑且它能提供的服務時,如果你非要註冊,使用p/invoke調用該dll的DllRegisterServer函數是一樣的效果)

因此,假如我們有一個activex控件(例如vlc),我們希望把它嵌入我們程序中,我們先看看常規的做法(本文沒有討論帶窗體的vlc,因為窗體這塊兒又複雜一些),直接貼圖:

看起來很簡單,但當我們需要打包給客戶使用時就很麻煩,涉及到嵌入vlc的安裝程序。而當我們會動態內存調用之後,就可以不註冊而使用vlc的功能,我先貼出代碼:

using System;
using System.Runtime.InteropServices;

namespace ConsoleApp3
{
    class Program
    {
        [DllImport("kernel32")]
        static extern IntPtr LoadLibraryEx(string path, IntPtr hFile, int dwFlags);
        [DllImport("kernel32")]
        static extern IntPtr GetProcAddress(IntPtr dll, string func);

        delegate int DllGetClassObject(Guid clsid, Guid iid, ref IntPtr ppv);

        delegate int CreateInstance(IntPtr _thisPtr, IntPtr unkown, Guid iid, ref IntPtr ppv);

        delegate int getVersionInfo(IntPtr _thisPtr, [MarshalAs(UnmanagedType.BStr)] out string bstr);

        static void Main(string[] args)
        {
            IntPtr dll = LoadLibraryEx(@"D:\Program Files\VideoLAN\VLC\axvlc.dll", default, 8);
            IntPtr func = GetProcAddress(dll, "DllGetClassObject");
            DllGetClassObject dllGetClassObject = (DllGetClassObject)Marshal.GetDelegateForFunctionPointer(func, typeof(DllGetClassObject));

            Guid vlc = new Guid("2d719729-5333-406c-bf12-8de787fd65e3");
            Guid clsid = new Guid("9be31822-fdad-461b-ad51-be1d1c159921");
            Guid iidClassFactory = new Guid("00000001-0000-0000-c000-000000000046");
            IntPtr objClassFactory = default;
            dllGetClassObject(clsid, iidClassFactory, ref objClassFactory);
            CreateInstance createInstance = (CreateInstance)Marshal.GetDelegateForFunctionPointer(Marshal.ReadIntPtr(Marshal.ReadIntPtr(objClassFactory) + IntPtr.Size * 3), typeof(CreateInstance));
            IntPtr obj = default;
            createInstance(objClassFactory, default, vlc, ref obj);
            getVersionInfo getVersion = (getVersionInfo)Marshal.GetDelegateForFunctionPointer(Marshal.ReadIntPtr(Marshal.ReadIntPtr(obj) + IntPtr.Size * 18), typeof(getVersionInfo));
            string versionInfo;
            getVersion(obj, out versionInfo);

            Console.ReadKey();
        }
    }
}

  上文中的代碼有幾處可能大家不容易懂,特別是指針偏移量的運算,這裏面有比較複雜的地方,文章篇幅有限,下來咱們細細研究。

  從11年下半年開始學習編程到現在已經很久了,有時候會覺得沒什麼奔頭。其實人生,無外乎兩件事,愛情和青春,我希望大家能有抓住的,就不要放手。兩年前,我為了要和一個女孩子多說幾句話,給人家講COM組件,其實我連c++有虛函數表都不知道,時至今日,我已經失去了她。今後怕是一直會任由靈魂遊盪,半夢半醒,即是人生。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※公開收購3c價格,不怕被賤賣!

※想知道網站建置網站改版該如何進行嗎?將由專業工程師為您規劃客製化網頁設計後台網頁設計

※不管是台北網頁設計公司台中網頁設計公司,全省皆有專員為您服務

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※帶您來看台北網站建置台北網頁設計,各種案例分享

RocketMQ一個新的消費組初次啟動時從何處開始消費呢?

目錄

@(本文目錄)

1、拋出問題

一個新的消費組訂閱一個已存在的Topic主題時,消費組是從該Topic的哪條消息開始消費呢?

首先翻閱DefaultMQPushConsumer的API時,setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)API映入眼帘,從字面意思來看是設置消費者從哪裡開始消費,正是解開該問題的”鑰匙“。ConsumeFromWhere枚舉類圖如下:

  • CONSUME_FROM_MAX_OFFSET
    從消費隊列最大的偏移量開始消費。
  • CONSUME_FROM_FIRST_OFFSET
    從消費隊列最小偏移量開始消費。
  • CONSUME_FROM_TIMESTAMP
    從指定的時間戳開始消費,默認為消費者啟動之前的30分鐘處開始消費。可以通過DefaultMQPushConsumer#setConsumeTimestamp。

是不是點小激動,還不快試試。

需求:新的消費組啟動時,從隊列最後開始消費,即只消費啟動后發送到消息服務器后的最新消息。

1.1 環境準備

本示例所用到的Topic路由信息如下:

Broker的配置如下(broker.conf)

brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH

storePathRootDir=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store
storePathCommitLog=E:/SH2019/tmp/rocketmq_home/rocketmq4.5_simple/store/commitlog
namesrvAddr=127.0.0.1:9876
autoCreateTopicEnable=false
mapedFileSizeCommitLog=10240
mapedFileSizeConsumeQueue=2000

其中重點修改了如下兩個參數:

  • mapedFileSizeCommitLog
    單個commitlog文件的大小,這裏使用10M,方便測試用。
  • mapedFileSizeConsumeQueue
    單個consumequeue隊列長度,這裏使用1000,表示一個consumequeue文件中包含1000個條目。

1.2 消息發送者代碼

public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 300; i++) {
        try {
            Message msg = new Message("TopicTest" ,"TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}

通過上述,往TopicTest發送300條消息,發送完畢后,RocketMQ Broker存儲結構如下:

1.3 消費端驗證代碼

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_01");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

執行上述代碼后,按照期望,應該是不會消費任何消息,只有等生產者再發送消息后,才會對消息進行消費,事實是這樣嗎?執行效果如圖所示:

令人意外的是,竟然從隊列的最小偏移量開始消費了,這就“尷尬”了。難不成是RocketMQ的Bug。帶着這個疑問,從源碼的角度嘗試來解讀該問題,並指導我們實踐。

2、探究CONSUME_FROM_MAX_OFFSET實現原理

對於一個新的消費組,無論是集群模式還是廣播模式都不會存儲該消費組的消費進度,可以理解為-1,此時就需要根據DefaultMQPushConsumer#consumeFromWhere屬性來決定其從何處開始消費,首先我們需要找到其對應的處理入口。我們知道,消息消費者從Broker服務器拉取消息時,需要進行消費隊列的負載,即RebalanceImpl。

溫馨提示:本文不會詳細介紹RocketMQ消息隊列負載、消息拉取、消息消費邏輯,只會展示出通往該問題的簡短流程,如想詳細了解消息消費具體細節,建議購買筆者出版的《RocketMQ技術內幕》書籍。

RebalancePushImpl#computePullFromWhere

public long computePullFromWhere(MessageQueue mq) {
        long result = -1;                                                                                                                                                                                                                  // @1
        final ConsumeFromWhere consumeFromWhere = this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeFromWhere();    
        final OffsetStore offsetStore = this.defaultMQPushConsumerImpl.getOffsetStore();
        switch (consumeFromWhere) {
            case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
            case CONSUME_FROM_MIN_OFFSET:
            case CONSUME_FROM_MAX_OFFSET:
            case CONSUME_FROM_LAST_OFFSET: {                                                                                                                                                                // @2
               // 省略部分代碼
                break;
            }
            case CONSUME_FROM_FIRST_OFFSET: {                                                                                                                                                              // @3
                // 省略部分代碼
                break;
            }
            case CONSUME_FROM_TIMESTAMP: {                                                                                                                                                                  //@4
                // 省略部分代碼
                break;
            }
            default:
                break;
        }
        return result;                                                                                                                                                                                                                  // @5
    }

代碼@1:先解釋幾個局部變量。

  • result
    最終的返回結果,默認為-1。
  • consumeFromWhere
    消息消費者開始消費的策略,即CONSUME_FROM_LAST_OFFSET等。
  • offsetStore
    offset存儲器,消費組消息偏移量存儲實現器。

代碼@2:CONSUME_FROM_LAST_OFFSET(從隊列的最大偏移量開始消費)的處理邏輯,下文會詳細介紹。

代碼@3:CONSUME_FROM_FIRST_OFFSET(從隊列最小偏移量開始消費)的處理邏輯,下文會詳細介紹。

代碼@4:CONSUME_FROM_TIMESTAMP(從指定時間戳開始消費)的處理邏輯,下文會詳細介紹。

代碼@5:返回最後計算的偏移量,從該偏移量出開始消費。

2.1 CONSUME_FROM_LAST_OFFSET計算邏輯

case CONSUME_FROM_LAST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {                                                                                                             // @2
        result = lastOffset;
    }
    // First start,no offset
    else if (-1 == lastOffset) {                                                                                                  // @3
        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {               
            result = 0L;
        } else {
            try {
                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);                     
            } catch (MQClientException e) {                                                                              // @4
                result = -1;
            }
        }
    } else {
        result = -1;    
    }
    break;
}

代碼@1:使用offsetStore從消息消費進度文件中讀取消費消費進度,本文將以集群模式為例展開。稍後詳細分析。

代碼@2:如果返回的偏移量大於等於0,則直接使用該offset,這個也能理解,大於等於0,表示查詢到有效的消息消費進度,從該有效進度開始消費,但我們要特別留意lastOffset為0是什麼場景,因為返回0,並不會執行CONSUME_FROM_LAST_OFFSET(語義)。

代碼@3:如果lastOffset為-1,表示當前並未存儲其有效偏移量,可以理解為第一次消費,如果是消費組重試主題,從重試隊列偏移量為0開始消費;如果是普通主題,則從隊列當前的最大的有效偏移量開始消費,即CONSUME_FROM_LAST_OFFSET語義的實現。

代碼@4:如果從遠程服務拉取最大偏移量拉取異常或其他情況,則使用-1作為第一次拉取偏移量。

分析,上述執行的現象,雖然設置的是CONSUME_FROM_LAST_OFFSET,但現象是從隊列的第一條消息開始消費,根據上述源碼的分析,只有從消費組消費進度存儲文件中取到的消息偏移量為0時,才會從第一條消息開始消費,故接下來重點分析消息消費進度存儲器(OffsetStore)在什麼情況下會返回0。

接下來我們將以集群模式來查看一下消息消費進度的查詢邏輯,集群模式的消息進度存儲管理器實現為:
RemoteBrokerOffsetStore,最終Broker端的命令處理類為:ConsumerManageProcessor。

ConsumerManageProcessor#queryConsumerOffset
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    final RemotingCommand response =
        RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);
    final QueryConsumerOffsetResponseHeader responseHeader =
        (QueryConsumerOffsetResponseHeader) response.readCustomHeader();
    final QueryConsumerOffsetRequestHeader requestHeader =
        (QueryConsumerOffsetRequestHeader) request
            .decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);

    long offset =
        this.brokerController.getConsumerOffsetManager().queryOffset(
            requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());    // @1

    if (offset >= 0) {                                                                                                                                          // @2
        responseHeader.setOffset(offset);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
    } else {                                                                                                                                                       // @3
        long minOffset =
            this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
                requestHeader.getQueueId());                                                                                                     // @4
        if (minOffset <= 0
            && !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(                                // @5
            requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {
            responseHeader.setOffset(0L);
            response.setCode(ResponseCode.SUCCESS);
            response.setRemark(null);
        } else {                                                                                                                                                 // @6
            response.setCode(ResponseCode.QUERY_NOT_FOUND);
            response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
        }
    }
    return response;
}

代碼@1:從消費消息進度文件中查詢消息消費進度。

代碼@2:如果消息消費進度文件中存儲該隊列的消息進度,其返回的offset必然會大於等於0,則直接返回該偏移量該客戶端,客戶端從該偏移量開始消費。

代碼@3:如果未從消息消費進度文件中查詢到其進度,offset為-1。則首先獲取該主題、消息隊列當前在Broker服務器中的最小偏移量(@4)。如果小於等於0(返回0則表示該隊列的文件還未曾刪除過)並且其最小偏移量對應的消息存儲在內存中而不是存在磁盤中,則返回偏移量0,這就意味着ConsumeFromWhere中定義的三種枚舉類型都不會生效,直接從0開始消費,到這裏就能解開其謎團了(@5)。

代碼@6:如果偏移量小於等於0,但其消息已經存儲在磁盤中,此時返回未找到,最終RebalancePushImpl#computePullFromWhere中得到的偏移量為-1。

看到這裏,大家應該能回答文章開頭處提到的問題了吧?

看到這裏,大家應該明白了,為什麼設置的CONSUME_FROM_LAST_OFFSET,但消費組是從消息隊列的開始處消費了吧,原因就是消息消費進度文件中並沒有找到其消息消費進度,並且該隊列在Broker端的最小偏移量為0,說的更直白點,consumequeue/topicName/queueNum的第一個消息消費隊列文件為00000000000000000000,並且消息其對應的消息緩存在Broker端的內存中(pageCache),其返回給消費端的偏移量為0,故會從0開始消費,而不是從隊列的最大偏移量處開始消費。

為了知識體系的完備性,我們順便來看一下其他兩種策略的計算邏輯。

2.2 CONSUME_FROM_FIRST_OFFSET

case CONSUME_FROM_FIRST_OFFSET: {
    long lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {    // @2
        result = lastOffset;
    } else if (-1 == lastOffset) {  // @3
        result = 0L;
    } else {                                  
        result = -1;                    // @4
    }
    break;
}

從隊列的開始偏移量開始消費,其計算邏輯如下:
代碼@1:首先通過偏移量存儲器查詢消費隊列的消費進度。

代碼@2:如果大於等於0,則從當前該偏移量開始消費。

代碼@3:如果遠程返回-1,表示並沒有存儲該隊列的消息消費進度,從0開始。

代碼@4:否則從-1開始消費。

2.4 CONSUME_FROM_TIMESTAMP

從指定時戳后的消息開始消費。

case CONSUME_FROM_TIMESTAMP: {
    ong lastOffset = offsetStore.readOffset(mq, ReadOffsetType.READ_FROM_STORE);   // @1
    if (lastOffset >= 0) {                                                                                                            // @2
        result = lastOffset;
    } else if (-1 == lastOffset) {                                                                                                 // @3
        if (mq.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            try {
                result = this.mQClientFactory.getMQAdminImpl().maxOffset(mq);
            } catch (MQClientException e) {
                result = -1;
            }
        } else {
            try {
                long timestamp = UtilAll.parseDate(this.defaultMQPushConsumerImpl.getDefaultMQPushConsumer().getConsumeTimestamp(),
                    UtilAll.YYYYMMDDHHMMSS).getTime();
                result = this.mQClientFactory.getMQAdminImpl().searchOffset(mq, timestamp);
            } catch (MQClientException e) {
                result = -1;
            }
        }
    } else {
        result = -1;
    }
    break;
}

其基本套路與CONSUME_FROM_LAST_OFFSET一樣:
代碼@1:首先通過偏移量存儲器查詢消費隊列的消費進度。

代碼@2:如果大於等於0,則從當前該偏移量開始消費。

代碼@3:如果遠程返回-1,表示並沒有存儲該隊列的消息消費進度,如果是重試主題,則從當前隊列的最大偏移量開始消費,如果是普通主題,則根據時間戳去Broker端查詢,根據查詢到的偏移量開始消費。

原理就介紹到這裏,下面根據上述理論對其進行驗證。

3、猜想與驗證

根據上述理論分析我們得知設置CONSUME_FROM_LAST_OFFSET但並不是從消息隊列的最大偏移量開始消費的“罪魁禍首”是因為消息消費隊列的最小偏移量為0,如果不為0,則就會符合預期,我們來驗證一下這個猜想。
首先我們刪除commitlog目錄下的文件,如圖所示:

其消費隊列截圖如下:

消費端的驗證代碼如下:

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my_consumer_02");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
            ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.out.printf("Consumer Started.%n");
}

運行結果如下:

並沒有消息存在的消息,符合預期。

4、解決方案

如果在生產環境下,一個新的消費組訂閱一個已經存在比較久的topic,設置CONSUME_FROM_MAX_OFFSET是符合預期的,即該主題的consumequeue/{queueNum}/fileName,fileName通常不會是00000000000000000000,如是是上面文件名,想要實現從隊列的最後開始消費,該如何做呢?那就走自動創建消費組的路子,執行如下命令:

./mqadmin updateSubGroup -n 127.0.0.1:9876 -c DefaultCluster -g my_consumer_05

//克隆一個訂閱了該topic的消費組消費進度
./mqadmin cloneGroupOffset -n 127.0.0.1:9876 -s my_consumer_01 -d my_consumer_05 -t TopicTest

//重置消費進度到當前隊列的最大值
./mqadmin resetOffsetByTime -n 127.0.0.1:9876 -g my_consumer_05 -t TopicTest -s -1

按照上上述命令后,即可實現其目的。

您都看到這裏了,麻煩幫忙點個贊,謝謝您的認可與鼓勵。

作者介紹:
丁威,《RocketMQ技術內幕》作者,RocketMQ 社區佈道師,公眾號: 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。歡迎加入我的知識星球,構建一個高質量的技術交流社群。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

Spring Security之多次登錄失敗后賬戶鎖定功能的實現

在上一次寫的文章中,為大家說到了如何動態的從數據庫加載用戶、角色、權限信息,從而實現登錄驗證及授權。在實際的開發過程中,我們通常會有這樣的一個需求:當用戶多次登錄失敗的時候,我們應該將賬戶鎖定,等待一定的時間之後才能再次進行登錄操作。

一、基礎知識回顧

要實現多次登錄失敗賬戶鎖定的功能,我們需要先回顧一下基礎知識:

  • Spring Security 不需要我們自己實現登錄驗證邏輯,而是將用戶、角色、權限信息以實現UserDetails和UserDetailsService接口的方式告知Spring Security。具體的登錄驗證邏輯Spring Security 會幫助我們實現。
  • UserDetails接口中有一個方法叫做isAccountNonLocked()用於判斷賬號是否被鎖定,也就是說我們應該通過該方法對應的set方法setAccountNonLocked(false)告知Spring Security該登錄賬戶被鎖定。
  • 那麼應該在哪裡判斷賬號登錄失敗的次數並執行鎖定機制呢?當然是我們之前文章給大家介紹的《自定義登錄成功及失敗結果處理》的AuthenticationFailureHandler。

建議您先閱讀本文,如果您對本文的實現過程感到迷惑,建議您再翻看本號之前的相關內容。

二、實現多次登錄失敗鎖定的原理

一般來說實現這個需求,我們需要針對每一個用戶記錄登錄失敗的次數nLock和鎖定賬戶的到期時間releaseTime。具體你是把這2個信息存儲在mysql、還是文件中、還是redis中等等,完全取決於你對你所處的應用架構適用性的判斷。具體的實現邏輯無非就是:

  • 登陸失敗之後,從存儲中將nLock取出來加1。
  • 如果nLock大於登陸失敗閾值(比如3次),則將nLock=0,然後設置releaseTime為當前時間加上鎖定周期。通過setAccountNonLocked(false)告知Spring Security該登錄賬戶被鎖定。
  • 如果nLock小於等於1,則將nLock再次存起來。
  • 在一個合適的時機,將鎖定狀態重置為setAccountNonLocked(true)。

這是一種非常典型的實現方式,筆者向大家介紹一款非常有用的開源軟件叫做:ratelimitj。這個軟件的功能主要是為API訪問進行限流,也就是說可以通過制定規則限制API接口的訪問頻率。那恰好登錄驗證接口也是API的一種啊,我們正好也需要限制它在一定的時間內的訪問次數。

三、具體實現

首先需要將ratelimitj通過maven坐標引入到我們的應用裏面來。我們使用的是內存存儲的版本,還有redis存儲的版本,大家可以根據自己的應用情況選用。

        <dependency>
            <groupId>es.moki.ratelimitj</groupId>
            <artifactId>ratelimitj-inmemory</artifactId>
            <version>0.4.1</version>
        </dependency>

之後通過繼承SimpleUrlAuthenticationFailureHandler ,實現onAuthenticationFailure方法。該實現是針對登錄失敗的結果的處理,在我們之前的文章中已經講過。

@Component
public class MyAuthenticationFailureHandler extends SimpleUrlAuthenticationFailureHandler {

    @Autowired
    UserDetailsManager userDetailsManager;

    //規則定義:1小時之內5次機會,就觸發限流行為
    Set<RequestLimitRule> rules = 
            Collections.singleton(RequestLimitRule.of(1 * 60, TimeUnit.MINUTES,5)); 
    RequestRateLimiter limiter = new InMemorySlidingWindowRequestRateLimiter(rules);


    @Override
    public void onAuthenticationFailure(HttpServletRequest request,
                                        HttpServletResponse response, 
                                        AuthenticationException exception) 
                                        throws IOException, ServletException {

         String userId = //從request或request.getSession中獲取登錄用戶名
         //計數器加1,並判斷該用戶是否已經到了觸發了鎖定規則
         boolean reachLimit = limiter.overLimitWhenIncremented(userId);

        if(reachLimit){ //如果觸發了鎖定規則,通過UserDetails告知Spring Security鎖定賬戶
               user.setAccountNonLocked(false);
               userDetailsManager.updateUser(user);
               SysUser user = (SysUser) userDetailsManager.loadUserByUsername(userId);
        }
        

        
        //此處省略通過response做json或html響應
    }
}
  • 核心實現注意看代碼中的註釋
  • 代碼中的SysUser為UserDetails的實現類,如果不知道如何實現請參考本號之前的文章
  • userDetailsManager被用於管理UserDetails信息,通過改變UserDetails改變Spring Security驗證行為。

四、重置鎖定狀態的時機

user.setAccountNonLocked(true);

重置鎖定狀態很簡單,就是上面的代碼。但是更重要的是如何選擇重置鎖定狀態的時機。筆者能想到幾種方案如下

  • 下一次登陸的時候,自定義過濾器,加在Spring Boot過濾器鏈最前端做鎖定狀態重置的判斷。
  • 當登錄賬戶被鎖定之後,之後用戶的每一次登錄都會拋出LockedException。我們完全可以通過Spring Boot的全局異常捕獲機制,在其中捕獲LockedException,並做鎖定狀態的判斷及重置行為。
  • 寫一個Spring 的定時器輪詢,當然這是最差的方案。

    期待您的關注

  • 向您推薦博主的系列文檔:
  • 本文轉載註明出處(必須帶連接,不能只轉文字):。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

※高價收購3C產品,價格不怕你比較

※如何讓商品強力曝光呢? 網頁設計公司幫您建置最吸引人的網站,提高曝光率!!

網頁設計一頭霧水??該從何著手呢? 找到專業技術的網頁設計公司,幫您輕鬆架站!

3c收購,鏡頭 收購有可能以全新價回收嗎?

※想知道最厲害的台北網頁設計公司推薦台中網頁設計公司推薦專業設計師”嚨底家”!!