深入理解 EF Core:EF Core 寫入數據時發生了什麼?

閱讀本文大概需要 14 分鐘。

原文:https://bit.ly/2C67m1C
作者:Jon P Smith
翻譯:王亮
聲明:我翻譯技術文章不是逐句翻譯的,而是根據我自己的理解來表述的。其中可能會去除一些本人實在不知道如何組織但又不影響理解的句子。

這是深入理解 EF Core 系列的第二篇文章。第一篇是關於 EF Core 如何從數據庫讀取數據的;而這一篇是關於 EF Core 如何向數據庫寫入數據的。這是四種數據庫操作 CRUD(新增、讀取、更新和刪除)中的 CUD 部分。

我假設你對 EF Core 已經有了一定的認識,但在深入學習之前,我們先來了解一下如何使用 EF Core,以確保我們已經掌握了一些基本知識。這是一個“深入研究”的課題,所以我準備大量的技術細節,希望我的描述方式你能理解。

本文是“深入理解 EF Core”系列中的第二篇。以下是本系列文章列表:

  • 深入理解 EF Core:當 EF Core 從數據庫讀取數據時發生了什麼?
  • 深入理解 EF Core:當 EF Core 寫入數據到數據庫時發生了什麼?(本文)
  • 深入理解 EF Core:使用查詢過濾器軟刪除數據(敬請期待)

概要

∮. EF Core 可以通過新的或已存在的關聯關係創建一個新的實體。為此,它必須以正確的順序來組織實體類,以便能夠建立各類之間的關聯。這使得開發人員很容易寫出具有複雜關聯關係的類。

∮. 當你調用 EF Core 的 Add 命令來添加一個新條目時,會發生很多事情:

  • EF Core 查找添加的類和其他類的所有關聯。對於每個關聯的類,它也會判斷是否需要在數據庫中創建一個新行,或者僅僅鏈接到數據庫中現有的行。
  • 它使用現有行的主鍵或偽主鍵為新添加的條目填充外鍵信息。

∮. EF Core 可以監測你從數據庫讀取的類的屬性的變化。它通過已讀入的類的隱藏副本來實現這一點。當你調用 SaveChanges 時,它會將每個讀入的屬性值與其原始值進行比較,並且會創建相應的數據更新命令。

∮. EF Core 的 Remove 方法將刪除參數提供的實體類的主鍵所指向的數據行。如果被刪除的類有外鍵關聯,那麼數據庫會自動進行相關的操作(比如級聯刪除),但你可以更改刪除的規則。

數據寫入基礎

提示:如果你已經對 EF Core 有一定的了解,那麼你可以跳過這一部分,這隻是一個簡單的 EF Core 寫入數據的例子。

在這一節的介紹中,我將描述一下本文用到的數據庫結構,然後給出一個簡單的數據庫寫入示例。下面是類/表的基本關係圖:

這些表被映射到具有類似名稱的類,例如 Book、BookAuthor、Author,這些類的屬性名稱與表的字段名稱相同。由於篇幅有限,我不打算展開來講這些類,但您可以在我的 GitHub 倉庫[1]中查看這些類。

和讀取數據一樣,EF Core 將數據寫入數據庫也是五部分:

  1. 數據庫服務器,如 SQL server, Sqlite, PostgreSQL…
  2. 映射到數據庫的一個類或多個類—我將它們稱為實體類
  3. 一個繼承 EF Core 的 DbContext 的類,該類包含 EF Core 的配置
  4. 一個創建數據庫的方法
  5. 最後,向數據庫寫入數據的命令

下面的單元測試代碼來自我的 GitHub 創庫[2],展示了一個簡單的示例,它從現有數據庫中讀取 4 個 Book 實體及其關聯的 BookAuthor 和 Authors 實體。

[Fact]
public void TestWriteTestDataSqliteInMemoryOk()
{
    //SETUP
    var options = SqliteInMemory.CreateOptions<EfCoreContext>();
    using (var context = new EfCoreContext(options))
    {
        context.Database.EnsureCreated();

        //ATTEMPT
        var book = new Book
        {
            Title = "Test",
            Reviews = new List<Review>()
        };
        book.Reviews.Add(new Review { NumStars = 5 });
        context.Add(book);
        context.SaveChanges();

        //VERIFY
        var bookWithReview = context.Books
            .Include(x => x.Reviews).Single()
        bookWithReview.Reviews.Count.ShouldEqual(1);
    }
}

現在,如果我們將單元測試代碼對應到上面的 5 部分,結果是這樣的:

  1. 數據庫服務器——第 5 行:我選擇了一個 Sqlite 數據庫服務器,在本例中是 SqliteInMemory.CreateOptions 方法,它使用我的一個 NuGet 包 EfCore.TestSupport 創建了一個內存數據庫(內存中的數據庫對於單元測試非常有用,因為你可以為這個測試建立一個新的空數據庫)。
  2. 實體類——和上一篇結構差不多,但是多了一個與 Book 關聯的 Review 實體類。
  3. 一個繼承 DbContext 的類——第 6 行:EfCoreContext 類繼承了 DbContext 類並配置了從類到數據庫的映射關係(你可以在我的 GitHub 倉庫[3] 中查看該類)。
  4. 一個創建數據庫的方法——第 8 行:第一次執行時,這句代碼會創建一個新的數據庫,包括創建正確的表、鍵、索引等。EnsureCreated 方法用於單元測試,但對於真實的應用程序,你最好手動執行 EF Core 的 Migration 命令。
  5. 向數據庫寫入數據的命令——第 17 到 18 行:
    • 第 17 行:Add 方法告訴 EF Core 需要將一個 Book 實體及其關係(在本例中,只是一個 Review 實體)寫入數據庫。
    • 第 18 行:SaveChange 方法將在數據庫中的 Books 和 Reviews 表中創建新行。

在 //VERIFY 註釋之後的最後幾行用來檢查數據是否已經被寫入數據庫。

在本例中,你向數據庫添加了新的記錄(SQL 的 INSERT INTO 命令)。EF Core 也可以處理更新和刪除數據庫的數據,下一節介紹這個新增示例,然後介紹其他新增、更新和刪除的示例。

寫入數據時數據庫端發生了什麼

我將從創建一個新的 Book 實體類和新的 Review 實體類開始。這兩個類的關係比較簡單。使用上面單元測試的例子,主要代碼如下:

var book = new Book
{
    Title = "Test",
    Reviews = new List<Review>()
};
book.Reviews.Add(new Review { NumStars = 1 });
context.Add(book);
context.SaveChanges();

為了將這兩個實體添加到數據庫,EF Core 需要這樣做:

  1. 確定它應該以什麼順序創建這些新行——在本例中,它必須在 Books 表中創建一行,這樣它就擁有 Books 的主鍵。
  2. 將主鍵複製到與其關聯的外鍵——在本例中,它將 Books 中的主鍵 BookId 複製到 Review 的外鍵。
  3. 複製數據庫中新創建的數據,以便實體類正確表示數據庫的數據——在這種情況下,它必須複製 BookId 並更新 BookId 屬性,包括 Book 和 Review 實體類以及 Review 實體類的 ReviewId。

下面我們看看上面代碼生成的 SQL 語句:

-- 第一次訪問數據庫
SET NOCOUNT ON;
-- 向數據庫的 Books 表生成一條新數據.
-- 數據庫生成 Books 的主鍵值
INSERT INTO [Books] ([Description], [Title], ...)
VALUES (@p0, @p1, @p2, @p3, @p4, @p5, @p6);

-- 返回主鍵值,檢查並確認數據行是否已添加
SELECT [BookId] FROM [Books]
WHERE @@ROWCOUNT = 1 AND [BookId] = scope_identity();

-- 第二次訪問數據庫
SET NOCOUNT ON;
-- 向數據庫的 Review 表生成一條新數據.
-- 數據庫生成 Review 的主鍵值
INSERT INTO [Review] ([BookId], [Comment], ...)
VALUES (@p7, @p8, @p9, @p10);

-- 返回主鍵值,檢查並確認數據行是否已添加
SELECT [ReviewId] FROM [Review]
WHERE @@ROWCOUNT = 1 AND [ReviewId] = scope_identity();

重要的一點是,EF Core 是按正確的順序處理實體類的,這樣它就可以填充外鍵。這是簡單的例子,但我遇到一個客戶項目的例子是,我不得不建立一個非常複雜的數據組成的 15 個不同的實體類,一些實體類是新增,一些是更新和刪除,EF Core 通過一個 SaveChanges 將把所有工作有序地完成了庫。因此,EF Core 使開發者可以很容易地將複雜的數據寫入數據庫。

我之所以提到這一點,是因為我看到過在 EF Core 代碼中,開發人員多次調用 SaveChanges 方法來從第一個新增命令中獲得主鍵,並把它設置為相關實體的外鍵。例如:

var book = new Book
{
    Title = "Test"
};
context.Add(book);
context.SaveChanges();
var review = new Review { BookId = book.BookId, NumStars = 1 }
context.Add(review);
context.SaveChanges();

雖然這代碼效果是一樣的,但它有一個缺陷——如果第二 SaveChanges 失敗,那麼就會發生部分數據更新到數據庫的情況。在某種情況下,這可能不是個問題,但對於像我客戶那種需要保證數據一致的情況,就非常糟糕了。

因此,從中得到的收穫是,您不需要將主鍵複製到外鍵中,因為你可以設置導航屬性,EF Core 將為您挑選出外鍵。因此,如果你認為需要調用兩次 SaveChanges,那麼通常意味着你沒有設置正確的導航屬性來處理這種情況。

寫數據時 DbContext 做了什麼

在上一節中,你看到了 EF Core 在數據庫端做了什麼,現在你要看看在 EF Core 中發生了什麼。大多數情況,你不需要知道,但有時候知道這些是非常重要的。例如,你只能在 SaveChanges 之前捕獲數據的狀態。而對於自增主鍵,你只有在 SaveChanges 被調用之後才能拿到主鍵的值。

與上一個示例相比,這個示例稍微複雜一些。在這個示例中,我想向你展示 EF Core 通過從數據庫中讀取的已有實體類的實例來處理另一個實體類的新實例。下面的代碼創建了一個新的 Book,但 Author 已經在數據庫中了。代碼註明了階段 1、階段 2 和階段 3,然後我用圖表描述每個階段發生的事情。

// 階段 1
var author = context.Authors.First();
var bookAuthor = new BookAuthor { Author = author };
var book = new Book
{
    Title = "Test Book",
    AuthorsLink = new List<BookAuthor> { bookAuthor }
};

// 階段 2
context.Add(book);

// 階段 3
context.SaveChanges();

接下來的三個圖向你展示了實體類及其跟蹤數據在每個階段內發生的事情。每個圖显示了其階段結束時的以下數據:

  • 流程的每個階段中每個實例的狀態。
  • Book 和 BookAuthor 類是棕色的,表示它們是類的新實例,需要添加到數據庫中,而 Author 實體類是藍色的,表示從數據庫中讀取的實例。
  • 主鍵和外鍵旁邊的括號是其當前的值。如果一個鍵是 (0),那麼它還沒有被設值。
  • 箭頭連線連接的是從導航屬性到其相應實體類。
  • 每個階段之間的變化通過粗體文本或箭頭連線的粗線显示。

下圖显示了階段 1 完成后的情況。用於設置一個新的 Book 實體類(左)和一個新的 BookAuthor 實體類(中),後者將 Book 連接接到一個現有的 Author 實體類(右)。

階段 1 這是調用任何 EF Core 方法之前的起點。

下一個圖显示了執行 context.Add(book) 之後的情況。更改部分以粗體显示。

你可能會驚訝於執行 Add 方法時所發生的事情。它將作為參數提供的實體的狀態設置為 Added(在本例中為 Book 實體)。然後通過導航屬性或外鍵值查看與該實體連接的所有實體。對於每個被連接的實體,它會執行以下操作(注意:我不知道它們執行的確切順序)。

  • 如果實體未被跟蹤(即其當前狀態為 Detached),則將其狀態設置為 Added——在本例中,它是 BookAuthor 實體。
  • 它用主鍵的值填充正確的外鍵的值。如果連接的主鍵還不可用,它將為跟蹤的主鍵和外鍵數據的 CurrentValue 屬性設置一個惟一的負數。你可以在上圖中看到這一點。
  • 它填充當前未設值的導航屬性——如上圖中所示。

最後一個階段,即階段 3,是調用 SaveChanges 方法時發生的情況,如圖所示。

在“寫數據時數據庫端發生了什麼”一節中,數據庫更改的任何列都被複制回實體類中,以便實體與數據庫匹配。在本例中,數據更新到數據庫時會把主鍵值更新到 Book 的 BookId 和 BookAuthor 的 BookId。
而且,此次數據庫寫入完成后,涉及的所有實體的狀態都會被更新為 Unchanged。

對於上面這樣一個很長的解釋,很多時候你不需要知道這些細節,你只管它“工作了”就行。但是,當某些東西不能正常工作或者想做一些複雜的事情時,比如記錄實體類的更改,那麼了解這個就非常有用。

更新數據到數據庫時發生了什麼

上面的示例是關於向數據庫添加新記錄的,但是沒有進行更新。在這一節中,我將展示當你更新數據庫中已有的記錄時會發生什麼。這裏使用我上一篇文章“EF Core 讀取數據時發生了什麼?”中講到的查詢例子。

這個更新很簡單,只有三行,但是它在代碼中有三個階段:讀取、更新和保存。

var books = context.Books.ToList();
books.First().PublishedOn = new DateTime(2020, 1, 1);
context.SaveChanges();

下圖展示了這三個階段:

如你所見,你使用的查詢類型很重要——普通查詢加載數據並把返回的實體保存一份“跟蹤快照”,返回的實體類被稱為“被跟蹤的”。如果實體沒有沒跟蹤,則無法更新它。

注意:上一節中的 Author 實體類也是被“跟蹤”的。在這個例子中,Author 的跟蹤狀態告訴 EF Core Author 已經在數據庫中,因此不會再次創建。

因此,如果你更改了加載的跟蹤實體類中的任何屬性,那麼當你調用 SaveChanges 時,它會將所有跟蹤的實體類與它們的跟蹤快照進行比較。對於每個類,它遍歷映射到數據庫字段的所有屬性。這個過程稱為更改跟蹤,它將檢測被跟蹤實體中的每一個更改,包括 Title、PubishedOn 等非關係屬性。

在這個簡單的示例中,只有 4 個 Book 實體,但在實際應用程序中,您可能已經加載了許多相互連接的實體類。在這一點上,比較階段可能需要一段時間。因此,你應該嘗試只加載需要更改的實體類。

注意:EF Core 有一個名為 Update 的命令,它用於更新每個屬性/列的特定情況。EF Core 會自動跟蹤更改,默認只更新已更改的屬性/列。

每次更新都將創建一個 SQL UPDATE 命令,所有這些更新都將在一個 SQL 事務中執行。使用 SQL 事務意味着所有更新都作為一個整體,如果其中任何一部分失敗,那麼事務中的任何數據庫更改都會失效。

從數據庫刪除數據時發生了什麼

CRUD 的最後一部分是 DELETE,這在某些情況很簡單,你只需要調用 context.Remove()。在另一些情況它很複雜,例如,當你刪除另一個實體類依賴的實體類時會發生什麼?

刪除映射到數據庫的實體類的方法是 Remove。舉個例子,我加載一個特定的 Book,然後刪除它。

var book = context.Books
    .Single(p => p.Title == "Quantum Networking");
context.Remove(book);
context.SaveChanges();

它的階段如下:

  1. 加載要刪除的 Book 實體類。這會獲取它的所有屬性數據,但對於刪除,您實際上只需要實體類的主鍵。
  2. 調用 Remove 方法其實是將 Book 的狀態標記為 Deleted。這些信息會有序地存儲在跟蹤快照中。
  3. 最後,SaveChanges 創建一個 SQL DELETE 命令,該命令與任何其他數據庫更改一起發送到數據庫,並且在一個 SQL 事務中。

這看起來很簡單,但這裏發生了一些重要的事情,從代碼看並不明顯。原來書名為“Quantum Networking”的書有其他一些實體類關聯到到它——在某個特定的測試用例中,書名為“Quantum Networking”的書關聯到以下實體類:

  • 兩個 Review
  • 一個 PriceOffer
  • 一個 BookAuthor

現在,Review、PriceOffer 和 BookAuthor 實體類只與這本書相關——我們使用術語叫依賴於 Book 實體類。因此,如果這本書被刪除了,那麼這些 Review、PriceOffer 和所關聯的 BookAuthor 數據行也應該被刪除。如果不刪除,那麼數據庫的關聯關係就是不正確的,SQL 數據庫將拋出異常。那麼,為什麼做這個刪除工作?

這裏所發生的都是因為設置了級聯刪除,級聯刪除規則設置了 Books 表和三個依賴表之間的數據庫關係。
下面是 EF Core 為創建 Review 表而生成的 SQL 命令的一個示例:

CREATE TABLE [Review] (
    [ReviewId] int NOT NULL IDENTITY,
    [VoterName] nvarchar(max) NULL,
    [NumStars] int NOT NULL,
    [Comment] nvarchar(max) NULL,
    [BookId] int NOT NULL,
    CONSTRAINT [PK_Review] PRIMARY KEY ([ReviewId]),
    CONSTRAINT [FK_Review_Books_BookId] FOREIGN KEY ([BookId])
         REFERENCES [Books] ([BookId]) ON DELETE CASCADE
);

CONSTRAINT 語句部分定義了約束規則,該約束表示 Review 通過 BookId 列鏈接到 Books 表中的一行。在該約束的最後,你將看到關於 DELETE 級聯的規則。它告訴數據庫,如果它鏈接的書被刪除了,那麼這個 Review 也應該被刪除。這意味着書的刪除是允許的,因為所有相關的行也被刪除了。

這是非常有用的,但有時候想要更改刪除規則怎麼辦?比如我決定不允許刪除客戶訂單中存在的書。為了做到這一點,我在 DbContext 中添加了一些 EF Core 配置來改變刪除規則,如下:

public class EfCoreContext : DbContext
{
    private readonly Guid _userId;

    public EfCoreContext(DbContextOptions<EfCoreContext> options)
        : base(options)

    public DbSet<Book> Books { get; set; }
    //… 其它 DbSet<T>

    protected override void OnModelCreating(ModelBuilder modelBuilder
    {
        //… 其它代碼

        modelBuilder.Entity<LineItem>()
            .HasOne(p => p.ChosenBook)
            .WithMany()
            .OnDelete(DeleteBehavior.Restrict);
    }
}

一旦該配置應用到數據庫,就不會生成 SQL 語句的 DELETE CASCADE。這意味着,如果你試圖刪除客戶訂單中的一本書,那麼數據庫將返回一個錯誤,EF Core 將把這個錯誤變成一個異常。

這使你對正在發生的事情有一個更深的了解,但是還有相當多的內容我沒有介紹(但我在我的書中介紹了)。這裡有一些關於刪除我還沒有提到的事情:

  • 實體類之間可以有 required 關係(依賴關係)和 optional 關係,EF Core 為每種類型使用不同的規則。
  • EF Core 可以通過設置 DeleteBehavior 來設置級聯刪除規則,當實體類存在循環關聯關係時,可以用它避免一些錯誤——一些數據庫在發現循環刪除時會拋出錯誤。
  • 你可以在調用 Remove 方法時提供一個新的只有主鍵有值的類來刪除實體類。這在處理只返回主鍵的場景非常有用。

總結

本文我介紹了 CRUD 中的新增、更新和刪除部分,前一篇文章介紹了讀取部分。

正如您所看到的,使用 EF Core 在數據庫中創建記錄很容易,但內部很複雜。你通常不需要知道 EF Core 或數據庫中發生了什麼,但了解一些細節可以讓你更好地利用 EF Core 的優勢。

更新也很簡單——只需在你讀入的實體類中更改一個或多個屬性,當你調用 SaveChanges 時,EF Core 會找到已更改的數據,並構建 SQL 命令更新數據庫。這適用於非關係屬性(如圖 Book 的 Title 屬性)和導航屬性(你可以在他們的關係)。

最後,我們看了一個刪除案例。同樣很容易使用,但很多處理也是在背後執行的。​ 另外,敬請關注我的下一篇文章,我將討論所謂的“軟刪除”。如果你設置了一個標誌,EF Core 就不會再看到這個實體類了,它仍然在數據庫中,但它是隱藏的。

希望本文對你有用,也希望你關注本系列的更多文章。

祝你編程愉快!

[1]. https://bit.ly/2MXK3ZY
[2]. https://bit.ly/2Yza7QQ
[3]. https://bit.ly/2Y0UORO

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

聚甘新

第 10 篇 評論接口

作者:HelloGitHub-追夢人物

此前我們一直在操作博客文章(Post)資源,並藉此介紹了序列化器(Serializer)、視圖集(Viewset)、路由器(Router)等 django-rest-framework 提供的便利工具,藉助這些工具,就可以非常快速地完成 RESTful API 的開發。

評論(Comment)是另一種資源,我們同樣藉助以上工具來完成對評論資源的接口開發。

首先是設計評論 API 的 URL,根據 RESTful API 的設計規範,評論資源的 URL 設計為:/comments/

對評論資源的操作有獲取某篇文章下的評論列表和創建評論兩種操作,因此相應的 HTTP 請求和動作(action)對應如下:

HTTP請求 Action URL
GET list_comments /posts/:id/comments/
POST create /comments/

文章評論列表 API 使用自定義的 action,放在 /post/ 接口的視圖集下;發表評論接口使用標準的 create action,需要定義單獨的視圖集。

然後需要一個序列化器,用於評論資源的序列化(獲取評論時),反序列化(創建評論時)。有了編寫文章序列化器的基礎,評論序列化器就是依葫蘆畫瓢的事。

comments/serializers.py

from rest_framework import serializers
from .models import Comment


class CommentSerializer(serializers.ModelSerializer):
    class Meta:
        model = Comment
        fields = [
            "name",
            "email",
            "url",
            "text",
            "created_time",
            "post",
        ]
        read_only_fields = [
            "created_time",
        ]
        extra_kwargs = {"post": {"write_only": True}}

注意這裏我們在 Meta 中增加了 read_only_fieldsextra_kwargs 的聲明。

read_only_fields 用於指定只讀字段的列表,由於 created_time 是自動生成的,用於記錄評論發布時間,因此聲明為只讀的,不允許通過接口進行修改。

extra_kwargs 指定傳入每個序列化字段的額外參數,這裏給 post 序列化字段傳入了 write_only 關鍵字參數,這樣就將 post 聲明為只寫的字段,這樣 post 字段的值僅在創建評論時需要。而在返回的資源中,post 字段就不會出現。

首先來實現創建評論的接口,先為評論創建一個視圖集:

comments/views.py

from rest_framework import mixins, viewsets
from .models import Comment
from .serializers import CommentSerializer

class CommentViewSet(mixins.CreateModelMixin, viewsets.GenericViewSet):
    serializer_class = CommentSerializer

    def get_queryset(self):
        return Comment.objects.all()

視圖集非常的簡單,混入 CreateModelMixin 后,視圖集就實現了標準的 create action。其實 create action 方法的實現也非常簡單,我們來學習一下 CreateModelMixin 的源碼實現。

class CreateModelMixin:
    """
    Create a model instance.
    """
    def create(self, request, *args, **kwargs):
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        self.perform_create(serializer)
        headers = self.get_success_headers(serializer.data)
        return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)

    def perform_create(self, serializer):
        serializer.save()

    def get_success_headers(self, data):
        try:
            return {'Location': str(data[api_settings.URL_FIELD_NAME])}
        except (TypeError, KeyError):
            return {}

核心邏輯在 create 方法:首先取到綁定了用戶提交數據的序列化器,用於反序列化。接着調用 is_valid 方法校驗數據合法性,如果不合法,會直接拋出異常(raise_exception=True)。否則就執行序列化的 save 邏輯將評論數據存入數據庫,最後返迴響應。

接着在 router 里註冊 CommentViewSet 視圖集:

router.register(r"comments", comments.views.CommentViewSet, basename="comment")

進入 API 交互後台,可以看到首頁列出了 comments 接口的 URL,點擊進入 /comments/ 后可以看到一個評論表單,在這裏可以提交評論數據與創建評論的接口進行交互。

接下來實現獲取評論列表的接口。通常情況下,我們都是只獲取某篇博客文章下的評論列表,因此我們的 API 設計成了 /posts/:id/comments/。這個接口具有很強的語義,非常符合 RESTful API 的設計規範。

由於接口位於 /posts/ 空間下,因此我們在 PostViewSet 添加自定義 action 來實現,先來看代碼:

blog/views.py

class PostViewSet(
    mixins.ListModelMixin, mixins.RetrieveModelMixin, viewsets.GenericViewSet
):
    # ...
    
    @action(
            methods=["GET"],
            detail=True,
            url_path="comments",
            url_name="comment",
            pagination_class=LimitOffsetPagination,
            serializer_class=CommentSerializer,
    )
    def list_comments(self, request, *args, **kwargs):
        # 根據 URL 傳入的參數值(文章 id)獲取到博客文章記錄
        post = self.get_object()
        # 獲取文章下關聯的全部評論
        queryset = post.comment_set.all().order_by("-created_time")
        # 對評論列表進行分頁,根據 URL 傳入的參數獲取指定頁的評論
        page = self.paginate_queryset(queryset)
        # 序列化評論
        serializer = self.get_serializer(page, many=True)
        # 返回分頁后的評論列表
        return self.get_paginated_response(serializer.data)

action 裝飾器我們在上一篇教程中進行了詳細說明,這裏我們再一次接觸到 action 裝飾器更為深入的用法,可以看到我們除了設置 methodsdetailurl_path 這些參數外,還通過設置 pagination_classserializer_class 來覆蓋原本在 PostViewSet 中設置的這些類屬性的值(例如對於分頁,PostViewSet 默認為我們之前設置的 PageNumberPagination,而這裏我們替換為 LimitOffsetPagination)。

list_comments 方法邏輯非常清晰,註釋中給出了詳細的說明。另外還可以看到我們調用了一些輔助方法,例如 paginate_queryset 對查詢集進行分頁;get_paginated_response 返回分頁后的 HTTP 響應,這些方法其實都是 GenericViewSet 提供的通用輔助方法,源碼也並不複雜,如果不用這些方法,我們自己也可以輕鬆實現,但既然 django-rest-framework 已經為我們寫好了,直接復用就行,具體的實現請大家通過閱讀源碼進行學習。

現在進入 API 交互後台,進入某篇文章的詳細接口,例如訪問 /api/posts/5/,Extra Actions 下拉框中可以看到 List comments 的選項:

點擊 List comments 即可進入這篇文章下的評論列表接口,獲取這篇文章的評論列表資源了:

關注公眾號加入交流群

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司“嚨底家”!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

聚甘新

特性速覽| Apache Hudi 0.5.3版本正式發布

1. 下載連接

  • 源代碼下載:Apache Hudi 0.5.3 Source Release (asc, sha512)
  • 0.5.3版本相關jar包地址:https://repository.apache.org/#nexus-search;quick~hudi

2. 遷移指南

  • 這是一個bugfix版本,從0.5.2升級時不需要任何特殊的遷移步驟。如果要從早期版本”X”升級,請閱讀”X”和0.5.3之間的每個後續版本的遷移指南。
  • 0.5.3是Hudi畢業后的第一個版本,因此所有hudi jar的版本名稱中不再帶有”-incubating”。在所有提及hudi版本的地方,請確保不再存在”-incubating”。

例如,hudi-spark-bundle pom依賴如下所示:

<dependency>
	<groupId>org.apache.hudi</groupId>
	<artifactId>hudi-spark-bundle_2.12</artifactId>
	<version>0.5.3</version>
</dependency>

3. 關鍵特性

  • Hudi內置支持 aliyun OSS 對象存儲。

  • 默認情況下將為delta-streamer和spark datasource寫入啟用Embedded Timeline Server。在此版本之前,此功能處於實驗模式,embeddedTimeline Server在Spark Driver中緩存文件列表,並提供Restful接口給Spark Writer任務調用來減少了每次寫入時的list文件列表的操作,此優化對雲上對象存儲非常友好。

  • 默認情況下為delta-streamer和Spark datasource寫入均啟用”增量清理(incremental cleaning)”。在此版本之前,此功能還處於實驗模式,在穩定狀態下,增量清理避免了掃描所有分區的昂貴步驟,而是使用Hudi元數據來查找要清理的文件,此優化也對雲上對象存儲非常友好。

  • 支持將Delta-Streamer配置文件放置在與實際數據不同的文件系統中。

  • Hudi Hive Sync現在支持按日期類型列分區的表。

  • Hudi Hive Sync現在支持直接通過Hive MetaStore進行同步。您只需要設置hoodie.datasource.hive_sync.use_jdbc = false。Hive Metastore Uri將從environment中隱式讀取。例如當通過Spark datasource寫入時,

     spark.write.format(“hudi”)
     .option(…)
     .option(“hoodie.datasource.hive_sync.username”, “<user>”)
     .option(“hoodie.datasource.hive_sync.password”, “<password>”)
     .option(“hoodie.datasource.hive_sync.partition_fields”, “<partition_fields>”)
     .option(“hoodie.datasource.hive_sync.database”, “<db_name>”)
     .option(“hoodie.datasource.hive_sync.table”, “<table_name>”)
     .option(“hoodie.datasource.hive_sync.use_jdbc”, “false”)
     .mode(APPEND)
     .save(“/path/to/dataset”)
    
  • 支持Presto查詢MoR表時Hudi側的改造。

  • 其他與Writer Performance相關的缺陷修復。

    • 現在DataSource Writer避免了寫入后不必要的數據加載。
    • Hudi Writer現在利用spark的併發來加速小文件查找。

4. 感謝

感謝如下貢獻者(排名不分先後): @bhasudha,@yanghua ,@ddong ,@smarthi ,@afilipchik,@zhedoubushishi,@umehrot2,@varadar,@ffcchi,@bschell,@vinothchandar ,@shenh062326,@lamber-ken,@zhaomin1423,@EdwinGuo,@prashantwason ,@pratyakshsharma,@dengziming ,@AakashPradeep,@Jecarm ,@xushiyan ,@cxzl25,@garyli1019 ,@rolandjohann ,@nsivabalan,@leesf ,@jfrazee

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

聚甘新

ConcurrentHashMap源碼解析-Java7

目錄

一.ConcurrentHashMap的模型圖

二.源碼分析-類定義

  2.1 極簡ConcurrentHashMap定義

  2.2 Segment內部類

  2.3 HashEntry內部類

  2.4 ConcurrentHashMap的重要常量

三.常用接口源碼分析

  3.1 ConcurrentHashMap構造方法

  3.2 map.put操作

  3.3 創建新segment

  3.4 segment.put操作

  3.5 segment.rehash擴容

  3.6 map.get操作

  3.7 map.remove操作

  3.8 map.size操作

 

  原文地址:https://www.cnblogs.com/-beyond/p/13157083.html

一.ConcurrentHashMap的模型圖

  之前看了Java8中的HashMap,然後想接着看Java8的ConcurrentHashMap,但是打開Java8的ConcurrentHashMap,瞬間就慫了,將近7k行代碼,而反觀一下Java7的Concurrent,也就在1500多行,那就先選擇少的看吧。畢竟Java7的ConcurrentHashMap更加簡單。下面所有的介紹都是針對Java7而言!!!!!

  下面是ConcurrentHashMap的結構圖,ConcurrentHashMap有一個segments數組,每個segment中又有一個table數組,該數組的每個元素時HashEntry類型。

   

  可以簡單的理解為ConcurrentHashMap是多個HashMap組成,每一個HashMap是一個segment,就如傳說中一樣,ConcurrentHashMap使用分段鎖的方式,這個“段”就是segment。

  ConcurrentHashMap之所以能夠保證併發安全,是因為支持對不同segment的併發修改操作,比如兩個線程同時修改ConcurrentHashMap,一個線程修改第一個segment的數據,另一個線程修改第二個segment的數據,兩個線程可以併發修改,不會出現併發問題;但是多個線程同一個segment進行併發修改,則需要先獲取該segment的鎖后再修改,修改完后釋放鎖,然後其他要修改的線程再進行修改。

  那麼,ConcurrentHashMap可以支持多少併發量呢?這個也就是問,ConcurrentHashMap最多能支持多少線程併發修改,其實也就是segment的數量,只要修改segment的這些線程不是修改同一個segment,那麼這些線程就可以并行執行,這也就是ConcurrentHashMap的併發量(segment的數量)。

  注意,ConcurrentHashMap創建完成后,也就是segment的數量、併發級別已經確定,則segment的數量和併發級別都不能再改變了,即使發生擴容,也是segment中的table進行擴容,segment的數量保持不變。

 

二.源碼分析-類定義

2.1 極簡ConcurrentHashMap定義

  下面是省略了大部分代碼的ConcurrentHashMap定義:

package java.util.concurrent;

import java.util.AbstractMap;
import java.util.concurrent.locks.ReentrantLock;

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {

    final Segment<K, V>[] segments;

    /**
     * segment分段的定義
     */
    static final class Segment<K, V> extends ReentrantLock implements Serializable {

        transient volatile HashEntry<K, V>[] table;
    }

    /**
     * 存放的元素節點
     */
    static final class HashEntry<K, V> {

    }
}

 

2.2 Segment內部類

  ConcurrentHashMap內部有一個segments屬性,是Segment類型的數組,Segment類和HashMap很相似(Java7的HashMap),維持一個數組,每個數組是一個HashEntry,當發生hash衝突后,則使用拉鏈法(頭插法)來解決衝突。

  而Segment數組的定義如下,省略了方法:

static final class Segment<K, V> extends ReentrantLock implements Serializable {
    static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
    private static final long serialVersionUID = 2249069246763182397L;
    
    // segment的負載因子(segments數組中的所有segment負載因子都相同)
    final float loadFactor;
    
    // segment中保存元素的數組
    transient volatile HashEntry<K, V>[] table;
   
    // 該segment中的元素個數
    transient int count;
    
    // modify count,該segment被修改的次數
    transient int modCount;
    
    // segment的擴容閾值
    transient int threshold;
}

  注意每個Segment都有負載因子、以及擴容閾值,但是後面可以看到,其實segments數組中的每一個segment,負載因子和擴容閾值都相同,因為創建的時候,都是使用0號segment的負載因子和擴容閾值。

 

2.3 HashEntry內部類

  Segment類中有一個table數組,table數組的每個元素都是HashEntry類型,和HashMap的Entry類似,源碼如下:(省略了方法)

/**
 * map中每個元素的類型
 */
static final class HashEntry<K, V> {
    final int hash;
    final K key;
    volatile V value;
    volatile HashEntry<K, V> next;
}

 

2.4 ConcurrentHashMap的一些常量

  ConcurrentHashMap中有很多常量,

// 默認初始容量
static final int DEFAULT_INITIAL_CAPACITY = 16;

// 默認的負載因子
static final float DEFAULT_LOAD_FACTOR = 0.75f;

// 默認的併發級別(同時支持多少線程併發修改)
// 因為JDK7中ConcurrentHashMap中是用分段鎖實現併發,不同分段的數據可以進行併發操作,同一個段的數據不能同時修改
static final int DEFAULT_CONCURRENCY_LEVEL = 16;

// 最大容量
static final int MAXIMUM_CAPACITY = 1 << 30;

// 每一個分段的數組容量初始值
static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

// 最多能有多少個segment
static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

// 嘗試對整個map進行操作(比如說統計map的元素數量),可能需要鎖定全部segment;
// 這個常量表示鎖定所有segment前,嘗試的次數
static final int RETRIES_BEFORE_LOCK = 2;

  

三.常用接口源碼分析

3.1 ConcurrentHashMap構造方法

  ConcurrentHashMap有多個構造方法,但是內部其實都是調用同一個進行創建:

public ConcurrentHashMap() {
    this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity) {
    this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}

public ConcurrentHashMap(int initialCapacity, float loadFactor) {
    this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}

/**
 * 統一調用的構造方法
 *
 * @param initialCapacity  初始容量
 * @param loadFactor       負載因子
 * @param concurrencyLevel 併發級別
 */
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
    // 校驗參數合法性
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
        throw new IllegalArgumentException();
    }

    // 對併發級別進行調整,不允許超過segment的數量(超過segment其實是沒有意義的)
    if (concurrencyLevel > MAX_SEGMENTS) {
        concurrencyLevel = MAX_SEGMENTS;
    }

    // 根據concurrencyLevel確定sshift和ssize的值
    int ssize = 1; // ssize是表示segment的數量,ssize是不小於concurrencyLevel的數,並且是2的n次方
    int sshift = 0;// sshift是ssize轉換為2進制后的位數,比如ssize為16(1000),則sshift為4
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }
    // 比如concurrencyLevel默認為16,走完循環,sshift為4,ssize為16
    // 如果concurrentLevel為8,則sshift為3,ssize為8

    // segment的shift偏移量
    this.segmentShift = 32 - sshift;
    // segment掩碼
    this.segmentMask = ssize - 1;

    // 對傳入的初始容量進行調整(不允許大於最大容量)
    if (initialCapacity > MAXIMUM_CAPACITY) {
        initialCapacity = MAXIMUM_CAPACITY;
    }

    // 假設傳入的容量為128,併發級別為16,則initialCapacity為128,ssize為16
    int c = initialCapacity / ssize;
    // c可以理解為根據傳入的初始容量,計算出每個segment中的數組容量
    if (c * ssize < initialCapacity) {
        ++c;
    }

    // cap表示的是每個segment中的數組容量
    int cap = MIN_SEGMENT_TABLE_CAPACITY;
    // 如果默認的每個segment中的數組長度小於上面計算出來的每個segment的數組長度,則將容量翻倍
    while (cap < c) {
        cap <<= 1;
    }

    // 創建一個segment,作為segments數組的第一個segment
    Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), new HashEntry[cap]);

    // 創建segments數組
    Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];

    // 將s0賦值給segments數組的第一個元素(偏移量為0)
    UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]

    // 複製segment數組
    this.segments = ss;
}

/**
 * 傳入map,將map中的元素加入到ConcurrentHashMap中
 * 注意使用默認的負載因子(0.75)和默認的併發級別(16)
 * 初始容量取map容量和默認容量的較大值
 */
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
    this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY),
            DEFAULT_LOAD_FACTOR,
            DEFAULT_CONCURRENCY_LEVEL);
    putAll(m);
}

  

3.2 map.put操作

  map.put,map就是指ConcurrentHashMap,其實就是確定HashEntry應該放入哪個segment中的哪個位置,所以可分為3步:

  1.首先需要確定放入哪個segment;

  2.確定segment后,再確定HashEntry應該放入segment的哪個位置;

  3.進行覆蓋覆蓋或者插入。

/**
 * put操作,注意key或者value為null時,會拋出NPE
 */
@SuppressWarnings("unchecked")
public V put(K key, V value) {
    Segment<K, V> s;
    if (value == null) {
        throw new NullPointerException();
    }

    // 計算key的hash
    int hash = hash(key);

    // hash值右移shift位后 與 mask掩碼進行取與,確定數據應該放到哪個segments數組的哪一個segment中
    int j = (hash >>> segmentShift) & segmentMask;

    // 判斷計算出的segment數組位置上的segment是否為null,如果為null,則進行創建segment
    if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
        s = ensureSegment(j);
    }

    // 創建segment后,將數據put到segment中,調用的segment.put()
    return s.put(key, hash, value, false);
}

  

3.3 創建新segment

  上面put的時候,如果發現segment為null,則會進行調用ensureSegment進行創建segment,代碼如下:

/**
 * 擴容(創建)segment
 *
 * @param k 需要擴容或者需要創建的segment位置
 * @return 返回擴容后的segment
 */
@SuppressWarnings("unchecked")
private Segment<K, V> ensureSegment(int k) {
    final Segment<K, V>[] ss = this.segments;
    long u = (k << SSHIFT) + SBASE; // 傳入的index,對應的偏移量
    Segment<K, V> seg;

    // 判斷是否需要擴容或者創建segment,如果獲取到segment不為null,則返回segment
    if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
        Segment<K, V> proto = ss[0]; // “原型設計模式”,使用segments數組的0號位置segment
        int cap = proto.table.length;// 0號segment的table長度
        float lf = proto.loadFactor; // 0號segment的負載因子
        int threshold = (int) (cap * lf); // 0號segment的擴容閾值

        // 創建一個HashEntry的數組,數組容量和0號segment相同
        HashEntry<K, V>[] tab = (HashEntry<K, V>[]) new HashEntry[cap];

        // 再次檢測,指定的segment是不是為null,如果為null才進行下一步操作
        if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
            // 創建segment
            Segment<K, V> s = new Segment<K, V>(lf, threshold, tab);

            // 使用CAS將新創建的segment填入指定的位置
            while ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
                if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) {
                    break;
                }
            }
        }
    }

    // 返回新增的segment
    return seg;
}

  上面需要注意的是:

  1.創建segment中的table數組時,是使用0號segment的table屬性(長度、負載因子、閾值);

  2.創建segment前會進行再check,check發現segment的確為null時,再進行創建segment;

  3.利用CAS來將創建的segment填入segments數組中;

 

3.4 segment.put操作

  當確定HashEntry應該放到哪個segment后,就開始調用segment的put方法,如下:

/**
 * 在確定應該存放到那個segment后,就segment.put()將k-v存入segment中
 *
 * @param key          put的key
 * @param hash         hash(key)的值
 * @param value        put的value
 * @param onlyIfAbsent true:key對應的Entry不進行覆蓋,false:key對應的entry存在與否,都進行put覆蓋
 * @return
 */
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 先獲取鎖(ReentrantLock,內部使用非公平鎖)
    HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
    V oldValue;
    try {
        HashEntry<K, V>[] tab = table;

        // 根據hash值計算出在segment的table中的位置
        int index = (tab.length - 1) & hash;

        // 定位到segment的table的index位置(第一個節點)
        HashEntry<K, V> first = entryAt(tab, index);

        // 從第一個節點開始遍歷
        for (HashEntry<K, V> e = first; ; ) {
            // 節點不為空,則判斷是否key是否相同(相同HashEntry)
            if (e != null) {
                K k;
                // 比較是否key是否相等(判斷put的key是否已經存在)
                if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                    // 如果key已經存在,則進行覆蓋,如果onlyIsAbsent為false(不關心key對應的Entry是否存在)
                    oldValue = e.value;
                    if (!onlyIfAbsent) {
                        // 覆蓋舊值,修改計數加1
                        e.value = value;
                        ++modCount;
                    }
                    break;
                }
                e = e.next;
            } else {
                // 頭插法,將put的k-v創建新HashEntry,放到first的前面
                if (node != null) {
                    node.setNext(first);
                } else {
                    node = new HashEntry<K, V>(hash, key, value, first);
                }

                // segment中table元素數量加1
                int c = count + 1;

                // 加1后的size大於擴容閾值,且數組的長度小於最大容量,則進行rehash
                if (c > threshold && tab.length < MAXIMUM_CAPACITY) {
                    // 擴容,並進行rehash
                    rehash(node);
                } else {
                    // 將節點放入數組中的指定位置
                    setEntryAt(tab, index, node);
                }

                // 修改次數加一,修改segment的table元素個數
                ++modCount;
                count = c;

                // 舊值為null
                oldValue = null;
                break;
            }
        }
    } finally {
        // 釋放鎖
        unlock();
    }
    return oldValue;
}

  梳理一下,大致在做下面幾件事:

  1.先獲取鎖(ReetrantLock,內部使用非公平鎖NonFairSync);

  2.獲取到鎖后根據hash計算出在table的位置;

  3.遍歷table的HashEntry的鏈表,如果有相同key的,則進行覆蓋,如果沒有,在進行頭插法;

  4.插入鏈表后,確定是否需要擴容,需要則執行rehash;

  5.修改計數(count、modCount),並且釋放鎖。

 

3.5 segment.rehash擴容

  segment擴容時,會將該segment的容量擴容為之前的2倍,並且將各位置的鏈表節點元素進行rehash。

/**
 * 將segment的table容量擴容一倍(newCap=oldCap*2),注意只會擴容該node所在的segment
 *
 * @param node segment[i]->鏈表的頭結點
 */
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K, V> node) {
    HashEntry<K, V>[] oldTable = table;
    int oldCapacity = oldTable.length;

    // 新容量為舊容量的2倍
    int newCapacity = oldCapacity << 1;

    // 設置新的擴容閾值
    threshold = (int) (newCapacity * loadFactor);

    // 申請新數組,數組長度為新容量值
    HashEntry<K, V>[] newTable = (HashEntry<K, V>[]) new HashEntry[newCapacity];

    // 循環遍歷segment的數組(舊數組)
    int sizeMask = newCapacity - 1; // 新的掩碼
    for (int i = 0; i < oldCapacity; i++) {
        // 獲取第i個位置的HashEntry節點,如果該節點為null,則該位置為空,不作處理
        HashEntry<K, V> e = oldTable[i];
        if (e != null) {
            HashEntry<K, V> next = e.next;

            // 計算新位置
            int idx = e.hash & sizeMask;

            // next為null,表示該位置只有一個節點,直接將節點複製到新位置
            if (next == null) {   //  Single node on list
                newTable[idx] = e;
            } else { // Reuse consecutive sequence at same slot
                HashEntry<K, V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K, V> last = next; last != null; last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }
                newTable[lastIdx] = lastRun;
                // 從頭結點開始,開始將節點拷貝到新數組中
                for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
                    V v = p.value;
                    int h = p.hash;
                    int k = h & sizeMask;
                    HashEntry<K, V> n = newTable[k];
                    newTable[k] = new HashEntry<K, V>(h, p.key, v, n);
                }
            }
        }
    }

    // 將頭結點添加到segment的table中
    int nodeIndex = node.hash & sizeMask; // add the new node
    node.setNext(newTable[nodeIndex]);
    newTable[nodeIndex] = node;
    table = newTable;
}

  為啥擴容的時候沒有加鎖呀?

  其實已經加鎖了,只不過不是在rehash中加鎖!!!因為rehash是在map.put中調用,put的時候已經加鎖了,所以rehash中不用加鎖。

  

3.6 map.get操作

  get操作,先定位到segment,然後定位到segment的具體位置,進行獲取

/**
 * 從ConcurrentHashMap中獲取key對應的value,不需要加鎖
 */
public V get(Object key) {
    Segment<K, V> s;
    HashEntry<K, V>[] tab;

    // 計算key的hash
    int h = hash(key);

    // 計算key處於哪一個segment中(index)
    long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;

    // 獲取數組中該位置的segment,如果該segment的table不為空,那麼就繼續在segment中查找,否則返回null,因為未找到
    if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {

        // tab指向segment的table數組,通過hash計算定位到table數組的位置(然後開始遍歷鏈表)
        HashEntry<K, V> e;
        for (e = (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
             e != null; e = e.next) {
            K k;
            // 判斷key和hash是否匹配,匹配則證明找到要查找的數據,將數據返回
            if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                return e.value;
        }
    }
    
    return null;
}

  

3.7 map.remove操作

   刪除節點,和get的流程差不多,先定位到segment,然後確定segment的哪個位置(哪條鏈表),遍歷鏈表,找到後進行刪除。

/**
 * 刪除map中key對應的元素
 */
public V remove(Object key) {
    // 計算key的hash
    int hash = hash(key);

    // 根據hash確定segment
    Segment<K, V> s = segmentForHash(hash);

    // 調用segment.remove進行刪除
    return s == null ? null : s.remove(key, hash, null);
}

/**
 * 刪除segment中key對應的hashEntry
 * 如果傳入的value不為空,則會在value匹配的時候進行刪除,否則不操作
 */
final V segmeng.remove(Object key, int hash, Object value) {
    // 獲取鎖失敗,則不斷自旋嘗試獲取鎖
    if (!tryLock()) {
        scanAndLock(key, hash);
    }

    V oldValue = null;
    try {
        HashEntry<K, V>[] tab = table;
        // 定位到segment中table的哪個位置
        int index = (tab.length - 1) & hash;
        HashEntry<K, V> e = entryAt(tab, index);
        HashEntry<K, V> pred = null;

        // 遍歷鏈表
        while (e != null) {
            K k;
            HashEntry<K, V> next = e.next;
            // 如果key和hash都匹配
            if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
                V v = e.value;
                // 如果沒有傳入value,則直接刪除該節點
                // 如果傳入了value,比如調用的map.remove(key,value),則要value匹配才會刪除,否則不操作
                if (value == null || value == v || value.equals(v)) {
                    if (pred == null) { // 頭結點就是要找刪除的元素,next為null,則將null賦值數組的該位置
                        setEntryAt(tab, index, next);
                    } else { // 
                        pred.setNext(next);
                    }

                    // 修改次數加一,map數量減一
                    ++modCount;
                    --count;
                    oldValue = v;
                }
                break;
            }

            // 不匹配時,pred保存當前一次檢測的節點,e指向下一個節點
            pred = e;
            e = next;
        }
    } finally {
        unlock();// 釋放鎖
    }
    return oldValue;
}

  

3.8 map.size操作

  ConcurrentHashMap的size(),需要統計每一個segment中的元素數量(也就是count值),因為不同的segment允許併發修改,統計過程中可能會出現修改操作,導致統計不準確,所以要想準確統計整個map的元素數量,可以這樣做:通過加鎖的方式來解決(將所有的segment都加鎖),這樣就能保證元素不會變化了,這是我們的想法。

  而ConcurrentHashMap是這樣解決的,先嘗試不加鎖進行統計兩次,這兩次統計,不止會統計每個segment的元素數量,還會統計每個segment的modCount(修改次數),進行匯總;如果兩次統計的modCount總量相同,也就說明兩次統計期間沒有修改操作,證明統計的元素總量正確;如果兩次modCount總量不相同,表示有修改操作,則進行重試,如果重試后,發現還是不準確(modCount不匹配),那麼就嘗試為所有的segment加鎖,再進行統計。

  源碼如下:

/**
 * 獲取ConcurrentHashMap中的元素個數,如果元素個數超過Integer.MAX_VALUE,則返回Integer.MAX_VALUE
 */
public int size() {
    final Segment<K, V>[] segments = this.segments;
    int size;           // 返回元素數量(統計結果->元素的總量)
    boolean overflow;   // 標誌是否溢出(是否超過Integer.MAX_VALUE)
    long sum;           // 所有segment的modCount總量次數(整個map的修改次數)
    long last = 0L;     // previous sum,上一輪統計的modCount總量
    int retries = -1;   // 記錄重試的次數

    try {
        // 此處for循環主要用於控制重試
        for (; ; ) {
            // 重試的次數如果達到RETRIES_BEFORE_LOCK,則強制獲取所有segment的鎖
            if (retries++ == RETRIES_BEFORE_LOCK) {
                for (int j = 0; j < segments.length; ++j) {
                    ensureSegment(j).lock();
                    // 強制獲取segment的table第i個位置,並加鎖
                }
            }

            sum = 0L;
            size = 0;
            overflow = false;
            // 開始對segments中的每一個segment中進行統計
            for (int j = 0; j < segments.length; ++j) {
                // 獲取第j個segment
                Segment<K, V> seg = segmentAt(segments, j);
                // 如果segment不為空,則進行統計
                if (seg != null) {
                    sum += seg.modCount;
                    int c = seg.count;
                    // size累加
                    if (c < 0 || (size += c) < 0)
                        overflow = true;
                }
            }

            // 如果本次統計的modCount總量和上次一樣,則表示在這兩次統計之間沒有進行過修改,則不再重試
            if (sum == last) {
                break;
            }
            // 記錄本次統計的modCount總量
            last = sum;
        }
    } finally {
        // 判斷是否加了鎖(如果retries大於RETRIES_BEFORE_LOCK),則證明加了鎖,於是進行釋放鎖
        if (retries > RETRIES_BEFORE_LOCK) {
            for (int j = 0; j < segments.length; ++j)
                segmentAt(segments, j).unlock();
        }
    }
    return overflow ? Integer.MAX_VALUE : size;
}

  

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司“嚨底家”!

※推薦評價好的iphone維修中心

聚甘新

機器學習——打開集成方法的大門,手把手帶你實現AdaBoost模型

本文始發於個人公眾號:TechFlow,原創不易,求個關注

今天是機器學習專題的第25篇文章,我們一起來聊聊AdaBoost。

我們目前為止已經學過了好幾個模型,光決策樹的生成算法就有三種。但是我們每次進行分類的時候,每次都是採用一個模型進行訓練和預測。我們日常在做一個決策的時候,往往會諮詢好幾個人,綜合採納他們的意見。那麼有沒有可能把這個思路照搬到機器學習領域當中,創建多個模型來綜合得出結果呢?

這當然是可以的,這樣的思路就叫做集成方法(ensemble method)。

集成方法

集成方法本身並不是某種具體的方法或者是算法,只是一種訓練機器學習模型的思路。它的含義只有一點,就是訓練多個模型,然後將它們的結果匯聚在一起。

根據這個思路,業內又衍生出了三種特定的方法,分別是Bagging、Boosting和Stacking。

Bagging

Bagging是bootstrap aggregating的縮寫,我們從字面上很難理解它的含義。我們記住這個名字即可,在Bagging方法當中,我們會通過有放回隨機採樣的方式創建K個數據集。對於每一個數據集來說,可能有一些單個的樣本重複出現,也可能有一些樣本從沒有出現過,但整體而言,每個樣本出現的概率是相同的。

之後,我們用抽樣出來的K個數據集訓練K個模型,這裏的模型沒有做限制,我們可以使用任何機器學習方模型。K個模型自然會得到K個結果,那麼我們採取民主投票的方式對這K個模型進行聚合。

舉個例子說,假設K=25,在一個二分類問題當中。有10個模型預測結果是0,15個模型預測結果是1。那麼最終整個模型的預測結果就是1,相當於K個模型民主投票,每個模型投票權一樣。大名鼎鼎的隨機森林就是採取的這種方式。

Boosting

Boosting的思路和Bagging非常相似,它們對於樣本的採樣邏輯是一致的。不同的是,在Boosting當中,這K個模型並不是同時訓練的,而是串行訓練的。每一個模型在訓練的時候都會基於之前模型的結果,更加關注於被之前模型判斷錯誤的樣本。同樣,樣本也會有一個權值,錯誤判斷率越大的樣本擁有越大的權值。

並且每一個模型根據它能力的不同,會被賦予不同的權重,最後會對所有模型進行加權求和,而不是公平投票。由於這個機制,使得模型在訓練的時候的效率也有差異。因為Bagging所有模型之間是完全獨立的,我們是可以採取分佈式訓練的。而Boosting中每一個模型會依賴之前模型的效果,所以只能串行訓練。

Stacking

Stacking是Kaggle比賽當中經常使用的方法,它的思路也非常簡單。我們選擇K種不同的模型,然後通過交叉驗證的方式,在訓練集上進行訓練和預測。保證每個模型都對所有的訓練樣本產出一個預測結果。那麼對於每一條訓練樣本,我們都能得到K個結果。

之後,我們再創建一個第二層的模型,它的訓練特徵就是這K個結果。也就是說Stacking方法當中會用到多層模型的結構,最後一層模型的訓練特徵是上層模型預測的結果。由模型自己去訓練究竟哪一個模型的結果更值得採納,以及如何組合模型之間的特長。

我們今天介紹的AdaBoost顧名思義,是一個經典的Boosting算法。

模型思路

AdaBoost的核心思路是通過使用Boosting的方法,通過一些弱分類器構建出強分類器來。

強分類器我們都很好理解,就是性能很強的模型,那麼弱分類器應該怎麼理解呢?模型的強弱其實是相對於隨機結果來定義的,比隨機結果越好的模型,它的性能越強。從這點出發,弱分類器也就是只比隨機結果略強的分類器。我們的目的是通過設計樣本和模型的權重,使得可以做出最佳決策,將這些弱分類器的結果綜合出強分類器的效果來。

首先我們會給訓練樣本賦予一個權重,一開始的時候,每一條樣本的權重均相等。根據訓練樣本訓練出一個弱分類器並計算這個分類器的錯誤率。然後在同一個數據集上再次訓練弱分類器,在第二次的訓練當中,我們將會調整每個樣本的權重。其中正確的樣本權重會降低,錯誤的樣本權重會升高

同樣每一個分類器也會分配到一個權重值,權重越高說明它的話語權越大。這些是根據模型的錯誤率來計算的。錯誤率定義為:

這裏的D表示數據集表示分類錯誤的集合,它也就等於錯誤分類的樣本數除以總樣本數。

有了錯誤率之後,我們可以根據下面這個公式得到

得到了之後,我們利用它對樣本的權重進行更新,其中分類正確的權重更改為:

分類錯誤的樣本權重更改為:

這樣,我們所有的權重都更新完了,這也就完成了一輪迭代。AdaBoost會反覆進行迭代和調整權重,直到訓練錯誤率為0或者是弱分類器的數量達到閾值。

代碼實現

首先,我們來獲取數據,這裏我們選擇了sklearn數據集中的乳腺癌預測數據。和之前的例子一樣,我們可以直接import進來使用,非常方便:

import numpy as np
import pandas as pd
from sklearn.datasets import load_breast_cancer

breast = load_breast_cancer()
X, y = breast.data, breast.target
# reshape,將一維向量轉成二維
y = y.reshape((-1, 1))

接着,我們將數據拆分成訓練數據和測試數據,這個也是常規做法了,沒有難度:

from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=23)

在AdaBoost模型當中,我們選擇的弱分類器是決策樹的樹樁。所謂的樹樁就是樹深為1的決策樹。樹深為1顯然不論我們怎麼選擇閾值,都不會得到特別好的結果,但是由於我們依然會選擇閾值和特徵,所以結果也不會太差,至少要比隨機選擇要好。所以這就保證了,我們可以得到一個比隨機選擇效果略好一些的弱分類器,並且它的實現非常簡單。

在我們實現模型之前,我們先來實現幾個輔助函數。

def loss_error(y_pred, y, weight):
    return weight.T.dot((y_pred != y_train))

def stump_classify(X, idx, threshold, comparator):
    if comparator == 'lt':
        return X[:, idx] <= threshold
    else:
        return X[:, idx] > threshold
    
def get_thresholds(X, i):
    min_val, max_val = X[:, i].min(), X[:, i].max()
    return np.linspace(min_val, max_val, 10)

這三個函數應該都不難理解,第一個函數當中我們計算了模型的誤差。由於我們每一個樣本擁有一個自身的權重,所以我們對誤差進行加權求和。第二個函數是樹樁分類器的預測函數,邏輯非常簡單,根據閾值比較大小。這裡有兩種情況,有可能小於閾值的樣本是正例,也有可能大於閾值的樣本是正例,所以我們還需要第三個參數記錄這個信息。第三個函數是生成閾值的函數,由於我們並不需要樹樁的性能特別好,所以我們也沒有必要去遍歷閾值的所有取值,簡單地把特徵的範圍劃分成10段即可。

接下來是單個樹樁的生成函數,它等價於決策樹當中選擇特徵進行數據拆分的函數,邏輯大同小異,只需要稍作修改即可。

def build_stump(X, y, weight):
    m, n = X.shape
    ret_stump, ret_pred = None, []
    best_error = float('inf')

    # 枚舉特徵
    for i in range(n):
        # 枚舉閾值
        for j in get_thresholds(X, i):
            # 枚舉正例兩種情況
            for c in ['lt', 'gt']:
                # 預測並且求誤差
                pred = stump_classify(X, i, j, c).reshape((-1, 1))
                err = loss_error(pred, y, weight)
                # 記錄下最好的樹樁
                if err < best_error:
                    best_error, ret_pred = err, pred.copy()
                    ret_stump = {'idx': i, 'threshold': j, 'comparator': c} 
    return ret_stump, best_error, ret_pred

接下來要做的就是重複生成樹樁的操作,計算,並且更新每一條樣本的權重。整個過程也沒有太多的難點,基本上就是照着實現公式:

def adaboost_train(X, y, num_stump):
    stumps = []
    m = X.shape[0]
    # 樣本權重初始化,一開始全部相等
    weight = np.ones((y_train.shape[0], 1)) / y_train.shape[0]
    # 生成num_stump個樹樁
    for i in range(num_stump):
        best_stump, err, pred = build_stump(X, y, weight)
        # 計算alpha
        alpha = 0.5 * np.log((1.0 - err) / max(err, 1e-10))
        best_stump['alpha'] = alpha
        stumps.append(best_stump)

        # 更新每一條樣本的權重
        for j in range(m):
            weight[j] = weight[j] * (np.exp(-alpha) if pred[j] == y[j] else np.exp(alpha))
        weight = weight / weight.sum()
        # 如果當前的準確率已經非常高,則退出
        if err < 1e-8:
            break
    return stumps

樹樁生成結束之後,最後就是預測的部分了。整個預測過程依然非常簡單,就是一個加權求和的過程。這裏要注意一下,我們在訓練的時候為了突出錯誤預測的樣本,讓模型擁有更好的能力,維護了樣本的權重。然而在預測的時候,我們是不知道預測樣本的權重的,所以我們只需要對模型的結果進行加權即可。

def adaboost_classify(X, stumps):
    m = X.shape[0]
    pred = np.ones((m, 1))
    alphs = 0.0
    for i, stump in enumerate(stumps):
        y_pred = stump_classify(X, stump['idx'], stump['threshold'], stump['comparator'])
        # 根據alpha加權求和
        pred = y_pred * stump['alpha']
        alphs += stump['alpha']
    pred /= alphs
    # 根據0.5劃分0和1類別
    return np.sign(pred).reshape((-1, 1))

到這裏,我們整個模型就實現完了,我們先來看下單個樹樁在訓練集上的表現:

可以看到準確率只有0.54,只是比隨機預測略好一點點而已。

然而當我們綜合了20個樹樁的結果之後,在訓練集上我們可以得到0.9的準確率。在預測集上,它的表現更好,準確率有接近0.95!

這是因為AdaBoost當中,每一個分類器都是弱分類器,它根本沒有過擬合的能力,畢竟在訓練集的表現都很差,這就保證了分類器學到的都是實在的泛化能力,在訓練集上適用,在測試集上很大概率也適用。這也是集成方法最大的優點之一。

總結

集成方法可以說是機器學習領域一個非常重要的飛躍,集成方法的出現,讓設計出一個強分類器這件事的難度大大降低,並且還保證了模型的效果。

因為在一些領域當中,設計一個強分類器可能非常困難,然而設計一個弱一些的分類器則簡單得多,再加上模型本身性能很好,不容易陷入過擬合。使得在深度學習模型流行之前,集成方法廣泛使用,幾乎所有機器學習領域的比賽的冠軍,都使用了集成學習。

集成學習當中具體的思想或許各有不同,但是核心的思路是一致的。我們理解了AdaBoost之後,再去學習其他的集成模型就要容易多了。

如果喜歡本文,可以的話,請點個關注,給我一點鼓勵,也方便獲取更多文章。

本文使用 mdnice 排版

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

聚甘新

從linux源碼看epoll

從linux源碼看epoll

前言

在linux的高性能網絡編程中,繞不開的就是epoll。和select、poll等系統調用相比,epoll在需要監視大量文件描述符並且其中只有少數活躍的時候,表現出無可比擬的優勢。epoll能讓內核記住所關注的描述符,並在對應的描述符事件就緒的時候,在epoll的就緒鏈表中添加這些就緒元素,並喚醒對應的epoll等待進程。
本文就是筆者在探究epoll源碼過程中,對kernel將就緒描述符添加到epoll並喚醒對應進程的一次源碼分析(基於linux-2.6.32內核版本)。由於篇幅所限,筆者聚焦於tcp協議下socket可讀事件的源碼分析。

簡單的epoll例子

下面的例子,是從筆者本人用c語言寫的dbproxy中的一段代碼。由於細節過多,所以做了一些刪減。

int init_reactor(int listen_fd,int worker_count){
	......
	// 創建多個epoll fd,以充分利用多核
	for(i=0;i<worker_count;i++){
		reactor->worker_fd = epoll_create(EPOLL_MAX_EVENTS);
	}
	/* epoll add listen_fd and accept */
	// 將accept后的事件加入到對應的epoll fd中
	int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));
	// 將連接描述符註冊到對應的worker裏面
	epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);
}
// reactor的worker線程
static void* rw_thread_func(void* arg){
	......

	for(;;){
		  // epoll_wait等待事件觸發
        int retval = epoll_wait(epfd,events,EPOLL_MAX_EVENTS,500);
        if(retval > 0){
        	for(j=0; j < retval; j++){
        		// 處理讀事件
        	   if(event & EPOLLIN){
                 handle_ready_read_connection(conn);
                 continue;
             }
             /* 處理其它事件 */
        	}
        }
	}
	......
}

上述代碼事實上就是實現了一個reactor模式中的accept與read/write處理線程,如下圖所示:

epoll_create

Unix的萬物皆文件的思想在epoll裏面也有體現,epoll_create調用返回一個文件描述符,此描述符掛載在anon_inode_fs(匿名inode文件系統)的根目錄下面。讓我們看下具體的epoll_create系統調用源碼:

SYSCALL_DEFINE1(epoll_create, int, size)
{
	if (size <= 0)
		return -EINVAL;

	return sys_epoll_create1(0);
}

由上述源碼可見,epoll_create的參數是基本沒有意義的,kernel簡單的判斷是否為0,然後就直接就調用了sys_epoll_create1。由於linux的系統調用是通過(SYSCALL_DEFINE1,SYSCALL_DEFINE2……SYSCALL_DEFINE6)定義的,那麼sys_epoll_create1對應的源碼即是SYSCALL_DEFINE(epoll_create1)。
(注:受限於寄存器數量的限制,(80×86下的)kernel限制系統調用最多有6個參數。據ulk3所述,這是由於32位80×86寄存器的限制)
接下來,我們就看下epoll_create1的源碼:

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
	// kzalloc(sizeof(*ep), GFP_KERNEL),用的是內核空間
	error = ep_alloc(&ep);
	// 獲取尚未被使用的文件描述符,即描述符數組的槽位
	fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
	// 在匿名inode文件系統中分配一個inode,並得到其file結構體
	// 且file->f_op = &eventpoll_fops
	// 且file->private_data = ep;
	file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
				 O_RDWR | (flags & O_CLOEXEC));
	// 將file填入到對應的文件描述符數組的槽裏面
	fd_install(fd,file);			 
	ep->file = file;
	return fd;
}

最後epoll_create生成的文件描述符如下圖所示:

struct eventpoll

所有的epoll系統調用都是圍繞eventpoll結構體做操作,現簡要描述下其中的成員:

/*
 * 此結構體存儲在file->private_data中
 */
struct eventpoll {
	// 自旋鎖,在kernel內部用自旋鎖加鎖,就可以同時多線(進)程對此結構體進行操作
	// 主要是保護ready_list
	spinlock_t lock;
	// 這個互斥鎖是為了保證在eventloop使用對應的文件描述符的時候,文件描述符不會被移除掉
	struct mutex mtx;
	// epoll_wait使用的等待隊列,和進程喚醒有關
	wait_queue_head_t wq;
	// file->poll使用的等待隊列,和進程喚醒有關
	wait_queue_head_t poll_wait;
	// 就緒的描述符隊列
	struct list_head rdllist;
	// 通過紅黑樹來組織當前epoll關注的文件描述符
	struct rb_root rbr;
	// 在向用戶空間傳輸就緒事件的時候,將同時發生事件的文件描述符鏈入到這個鏈表裡面
	struct epitem *ovflist;
	// 對應的user
	struct user_struct *user;
	// 對應的文件描述符
	struct file *file;
	// 下面兩個是用於環路檢測的優化
	int visited;
	struct list_head visited_list_link;
};

本文講述的是kernel是如何將就緒事件傳遞給epoll並喚醒對應進程上,因此在這裏主要聚焦於(wait_queue_head_t wq)等成員。

epoll_ctl(add)

我們看下epoll_ctl(EPOLL_CTL_ADD)是如何將對應的文件描述符插入到eventpoll中的。
藉助於spin_lock(自旋鎖)和mutex(互斥鎖),epoll_ctl調用可以在多個KSE(內核調度實體,即進程/線程)中併發執行。

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
		struct epoll_event __user *, event)
{
	/* 校驗epfd是否是epoll的描述符 */
	// 此處的互斥鎖是為了防止併發調用epoll_ctl,即保護內部數據結構
	// 不會被併發的添加修改刪除破壞
	mutex_lock_nested(&ep->mtx, 0);
	switch (op) {
		case EPOLL_CTL_ADD:
			...
			// 插入到紅黑樹中
			error = ep_insert(ep, &epds, tfile, fd);
			...
			break;
		......
	}
	mutex_unlock(&ep->mtx);	
}		

上述過程如下圖所示:

ep_insert

在ep_insert中初始化了epitem,然後初始化了本文關注的焦點,即事件就緒時候的回調函數,代碼如下所示:

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
		     struct file *tfile, int fd)
{
	/* 初始化epitem */
	// &epq.pt->qproc = ep_ptable_queue_proc
	init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
	// 在這裏將回調函數注入
	revents = tfile->f_op->poll(tfile, &epq.pt);
	// 如果當前有事件已經就緒,那麼一開始就會被加入到ready list
	// 例如可寫事件
	// 另外,在tcp內部ack之後調用tcp_check_space,最終調用sock_def_write_space來喚醒對應的epoll_wait下的進程
	if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
		list_add_tail(&epi->rdllink, &ep->rdllist);
		// wake_up ep對應在epoll_wait下的進程
		if (waitqueue_active(&ep->wq)){
			wake_up_locked(&ep->wq);
		}
		......
	}	
	// 將epitem插入紅黑樹
	ep_rbtree_insert(ep, epi);
	......
}

tfile->f_op->poll的實現

向kernel更底層註冊回調函數的是tfile->f_op->poll(tfile, &epq.pt)這一句,我們來看一下對於對應的socket文件描述符,其fd=>file->f_op->poll的初始化過程:

    // 將accept后的事件加入到對應的epoll fd中
    int client_fd = accept(listen_fd,(struct sockaddr *)&client_addr,&client_len)));
    // 將連接描述符註冊到對應的worker裏面
    epoll_ctl(reactor->client_fd,EPOLL_CTL_ADD,epifd,&event);

回顧一下上述user space代碼,fd即client_fd是由tcp的listen_fd通過accept調用而來,那麼我們看下accept調用鏈的關鍵路徑:

accept
      |->accept4
            |->sock_attach_fd(newsock, newfile, flags & O_NONBLOCK);
                  |->init_file(file,...,&socket_file_ops);
                        |->file->f_op = fop;
                              /* file->f_op = &socket_file_ops */
            |->fd_install(newfd, newfile); // 安裝fd

那麼,由accept獲得的client_fd的結構如下圖所示:

(注:由於是tcp socket,所以這邊sock->ops=inet_stream_ops,這個初始化的過程在我的另一篇博客<<從linux源碼看socket的阻塞和非阻塞>>中,博客地址如下:
https://my.oschina.net/alchemystar/blog/1791017)
既然知道了tfile->f_op->poll的實現,我們就可以看下此poll是如何將安裝回調函數的。

回調函數的安裝

kernel的調用路徑如下:

sock_poll /*tfile->f_op->poll(tfile, &epq.pt)*/;
	|->sock->ops->poll
		|->tcp_poll
			/* 這邊重要的是拿到了sk_sleep用於KSE(進程/線程)的喚醒 */
			|->sock_poll_wait(file, sk->sk_sleep, wait);
				|->poll_wait
					|->p->qproc(filp, wait_address, p);
					/* p為&epq.pt,而且&epq.pt->qproc= ep_ptable_queue_proc*/
						|-> ep_ptable_queue_proc(filp,wait_address,p);

繞了一大圈之後,我們的回調函數的安裝其實就是調用了eventpoll.c中的ep_ptable_queue_proc,而且向其中傳遞了sk->sk_sleep作為其waitqueue的head,其源碼如下所示:

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
				 poll_table *pt)
{
	// 取出當前client_fd對應的epitem
	struct epitem *epi = ep_item_from_epqueue(pt);
	// &pwq->wait->func=ep_poll_callback,用於回調喚醒
	// 注意,這邊不是init_waitqueue_entry,即沒有將當前KSE(current,當前進程/線程)寫入到
	// wait_queue當中,因為不一定是從當前安裝的KSE喚醒,而應該是喚醒epoll\_wait的KSE
	init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
	// 這邊的whead是sk->sk_sleep,將當前的waitqueue鏈入到socket對應的sleep列表
	add_wait_queue(whead, &pwq->wait);	
}	

這樣client_fd的結構進一步完善,如下圖所示:

ep_poll_callback函數是喚醒對應epoll_wait的地方,我們將在後面一起講述。

epoll_wait

epoll_wait主要是調用了ep_poll:

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
		int, maxevents, int, timeout)
{
	/* 檢查epfd是否是epoll\_create創建的fd */
	// 調用ep_poll
	error = ep_poll(ep, events, maxevents, timeout);
	...
}

緊接着,我們看下ep_poll函數:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
		   int maxevents, long timeout)
{
	......
retry:
	// 獲取spinlock
	spin_lock_irqsave(&ep->lock, flags);
	// 將當前task_struct寫入到waitqueue中以便喚醒
	// wq_entry->func = default_wake_function;
	init_waitqueue_entry(&wait, current);
	// WQ_FLAG_EXCLUSIVE,排他性喚醒,配合SO_REUSEPORT從而解決accept驚群問題
	wait.flags |= WQ_FLAG_EXCLUSIVE;
	// 鏈入到ep的waitqueue中
	__add_wait_queue(&ep->wq, &wait);
	for (;;) {
		// 設置當前進程狀態為可打斷
		set_current_state(TASK_INTERRUPTIBLE);
		// 檢查當前線程是否有信號要處理,有則返回-EINTR
		if (signal_pending(current)) {
			res = -EINTR;
			break;
		}
		spin_unlock_irqrestore(&ep->lock, flags);
		// schedule調度,讓出CPU
		jtimeout = schedule_timeout(jtimeout);
		spin_lock_irqsave(&ep->lock, flags);
	}
	// 到這裏,表明超時或者有事件觸發等動作導致進程重新調度
	__remove_wait_queue(&ep->wq, &wait);
	// 設置進程狀態為running
	set_current_state(TASK_RUNNING);
	......
	// 檢查是否有可用事件
	eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
	......
	// 向用戶空間拷貝就緒事件
	ep_send_events(ep, events, maxevents)
}		   

上述邏輯如下圖所示:

ep_send_events

ep_send_events函數主要就是調用了ep_scan_ready_list,顧名思義ep_scan_ready_list就是掃描就緒列表:

static int ep_scan_ready_list(struct eventpoll *ep,
			      int (*sproc)(struct eventpoll *,
					   struct list_head *, void *),
			      void *priv,
			      int depth)
{
	...
	// 將epfd的rdllist鏈入到txlist
	list_splice_init(&ep->rdllist, &txlist);
	...
	/* sproc = ep_send_events_proc */
	error = (*sproc)(ep, &txlist, priv);
	...
	// 處理ovflist,即在上面sproc過程中又到來的事件
	...
}

其主要調用了ep_send_events_proc:

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
			       void *priv)
{
	for (eventcnt = 0, uevent = esed->events;
	     !list_empty(head) && eventcnt < esed->maxevents;) {
	   // 遍歷ready list 
		epi = list_first_entry(head, struct epitem, rdllink);
		list_del_init(&epi->rdllink);
		// readylist只是表明當前epi有事件,具體的事件信息還是得調用對應file的poll
		// 這邊的poll即是tcp_poll,根據tcp本身的信息設置掩碼(mask)等信息 & 上興趣事件掩碼,則可以得知當前事件是否是epoll_wait感興趣的事件
		revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
			epi->event.events;
		if(revents){
			/* 將event放入到用戶空間 */
			/* 處理ONESHOT邏輯 */
			// 如果不是邊緣觸發,則將當前的epi重新加回到可用列表中,這樣就可以下一次繼續觸發poll,如果下一次poll的revents不為0,那麼用戶空間依舊能感知 */
			else if (!(epi->event.events & EPOLLET)){
				list_add_tail(&epi->rdllink, &ep->rdllist);
			}
			/* 如果是邊緣觸發,那麼就不加回可用列表,因此只能等到下一個可用事件觸發的時候才會將對應的epi放到可用列表裡面*/
			eventcnt++
		}
		/* 如poll出來的revents事件epoll_wait不感興趣(或者本來就沒有事件),那麼也不會加回到可用列表 */
		......
	}
	return eventcnt;
}			    

上述代碼邏輯如下所示:

事件到來添加到epoll就緒隊列(rdllist)的過程

經過上述章節的詳述之後,我們終於可以闡述,tcp在數據到來時是怎麼加入到epoll的就緒隊列的了。

可讀事件到來

首先我們看下tcp數據包從網卡驅動到kernel內部tcp協議處理調用鏈:

step1:

網絡分組到來的內核路徑,網卡發起中斷後調用netif_rx將事件掛入CPU的等待隊列,並喚起軟中斷(soft_irq),再通過linux的軟中斷機制調用net_rx_action,如下圖所示:

注:上圖來自PLKA(<<深入Linux內核架構>>)

step2:

緊接着跟蹤next_rx_action

next_rx_action
	|-process_backlog
		......
			|->packet_type->func 在這裏我們考慮ip_rcv
					|->ipprot->handler 在這裏ipprot重載為tcp_protocol
						(handler 即為tcp_v4_rcv)					

我們再看下對應的tcp_v4_rcv

tcp_v4_rcv
      |->tcp_v4_do_rcv
            |->tcp_rcv_state_process
                  |->tcp_data_queue
                        |-> sk->sk_data_ready(sock_def_readable)
                              |->wake_up_interruptible_sync_poll(sk->sleep,...)
                                    |->__wake_up
                                          |->__wake_up_common
                                                |->curr->func
                                                /* 這裏已經被ep_insert添加為ep_poll_callback,而且設定了排它標識WQ_FLAG_EXCLUSIVE*/
                                                      |->ep_poll_callback

這樣,我們就看下最終喚醒epoll_wait的ep_poll_callback函數:

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
	// 獲取wait對應的epitem	
	struct epitem *epi = ep_item_from_wait(wait);
	// epitem對應的eventpoll結構體
	struct eventpoll *ep = epi->ep;
	// 獲取自旋鎖,保護ready_list等結構
	spin_lock_irqsave(&ep->lock, flags);
	// 如果當前epi沒有被鏈入ep的ready list,則鏈入
	// 這樣,就把當前的可用事件加入到epoll的可用列表了
	if (!ep_is_linked(&epi->rdllink))
		list_add_tail(&epi->rdllink, &ep->rdllist);
	// 如果有epoll_wait在等待的話,則喚醒這個epoll_wait進程
	// 對應的&ep->wq是在epoll_wait調用的時候通過init_waitqueue_entry(&wait, current)而生成的
	// 其中的current即是對應調用epoll_wait的進程信息task_struct
	if (waitqueue_active(&ep->wq))
		wake_up_locked(&ep->wq);
}

上述過程如下圖所示:

最後wake_up_locked調用__wake_up_common,然後調用了在init_waitqueue_entry註冊的default_wake_function,調用路徑為:

wake_up_locked
	|->__wake_up_common
		|->default_wake_function
			|->try_wake_up (wake up a thread)
				|->activate_task
					|->enqueue_task    running

將epoll_wait進程推入可運行隊列,等待內核重新調度進程,然後epoll_wait對應的這個進程重新運行后,就從schedule恢復,繼續下面的ep_send_events(向用戶空間拷貝事件並返回)。
wake_up過程如下圖所示:

可寫事件到來

可寫事件的運行過程和可讀事件大同小異:
首先,在epoll_ctl_add的時候預先會調用一次對應文件描述符的poll,如果返回事件里有可寫掩碼的時候直接調用wake_up_locked以喚醒對應的epoll_wait進程。
然後,在tcp在底層驅動有數據到來的時候可能攜帶了ack從而可以釋放部分已經被對端接收的數據,於是觸發可寫事件,這一部分的調用鏈為:

tcp_input.c
tcp_v4_rcv
	|-tcp_v4_do_rcv
		|-tcp_rcv_state_process
			|-tcp_data_snd_check
				|->tcp_check_space
					|->tcp_new_space
						|->sk->sk_write_space
						/* tcp下即是sk_stream_write_space*/

最後在此函數裏面sk_stream_write_space喚醒對應的epoll_wait進程

void sk_stream_write_space(struct sock *sk)
{
	// 即有1/3可寫空間的時候才觸發可寫事件
	if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk) && sock) {
		clear_bit(SOCK_NOSPACE, &sock->flags);

		if (sk->sk_sleep && waitqueue_active(sk->sk_sleep))
			wake_up_interruptible_poll(sk->sk_sleep, POLLOUT |
						POLLWRNORM | POLLWRBAND)
		......
	}
}

關閉描述符(close fd)

值得注意的是,我們在close對應的文件描述符的時候,會自動調用eventpoll_release將對應的file從其關聯的epoll_fd中刪除,kernel關鍵路徑如下:

close fd
      |->filp_close
            |->fput
                  |->__fput
                        |->eventpoll_release
                              |->ep_remove

所以我們在關閉對應的文件描述符后,並不需要通過epoll_ctl_del來刪掉對應epoll中相應的描述符。

總結

epoll作為linux下非常優秀的事件觸發機製得到了廣泛的運用。其源碼還是比較複雜的,本文只是闡述了epoll讀寫事件的觸發機制,探究linux kernel源碼的過程非常快樂_

公眾號

關注筆者公眾號,獲取更多乾貨文章:

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

聚甘新

html/css 滾動到元素位置,显示加載動畫

每次滾動到元素時,都显示加載動畫,如何添加?

 

元素添加初始參數

以上圖中的動畫為例,添加倆個左右容器,將內容放置在容器內部。

添加初始數據,默認透明度0、左右分別移動100px。

 1   //左側容器
 2   .item-leftContainer {
 3     opacity: 0;
 4     transform: translateX(-100px);
 5   }
 6   //右側容器
 7   .item-rightContainer {
 8     opacity: 0;
 9     transform: translateX(100px);
10   }

添加動畫數據

在less中添加動畫數據。這裏只設置了to,也可以省略第1步中的初始參數設置而在動畫里設置from。

執行后,透明度由0到1,倆個容器向中間移動100px回到原處。

 1   //動畫
 2   @keyframes showLeft {
 3     to {
 4       opacity: 1;
 5       transform: translateX(0px);
 6     }
 7   }
 8   @keyframes showRight {
 9     to {
10       opacity: 1;
11       transform: translateX(0px);
12     }
13   }
14   @keyframes hideLeft {
15     to {
16       opacity: 0;
17       transform: translateX(-100px);
18     }
19   }
20   @keyframes hideRight {
21     to {
22       opacity: 0;
23       transform: translateX(100px);
24     }
25   }

觸發動畫

頁面加載/刷新觸發 – 在componentDidMount中執行

頁面滾動時觸發 – 在componentDidMount、componentWillUnmount添加監聽/註銷頁面滾動的事件

校驗當前滾動高度與元素的位置差異:

window.pageYOffset(滾動距離) + windowHeight(窗口高度) > leftElement.offsetTop (元素的相對位置)+ parentOffsetTop(父元素的相對位置) + 200

  1. 真正的滾動視覺位置 – window.pageYOffset(滾動距離) + windowHeight(窗口高度)
  2. 元素距離頂部的高度 – 這裏使用了leftElement.offsetTop + parentOffsetTop,原因是父容器使用了absolute絕對定位。如果是正常布局的話,使用元素當前的位置leftElement.offsetTop即可
  3. 額外添加200高度,是為了優化視覺體驗。當超出200高度時才觸發動畫

當頁面滾動到下方,觸發显示動畫;當頁面重新滾動到上方,觸發隱藏動畫。

 1     componentDidMount() {
 2         this.checkScrollHeightAndLoadAnimation();
 3         window.addEventListener('scroll', this.bindHandleScroll);
 4     }
 5     componentWillUnmount() {
 6         window.removeEventListener('scroll', this.bindHandleScroll);
 7     }
 8     bindHandleScroll = (event) => {
 9         this.checkScrollHeightAndLoadAnimation();
10     }
11     checkScrollHeightAndLoadAnimation() {
12         const windowHeight = window.innerHeight;
13         let parentEelement = document.getElementById("softwareUsingWays-container") as HTMLElement;
14         const parentOffsetTop = parentEelement.offsetTop;
15         let leftElement = (parentEelement.getElementsByClassName("item-leftContainer") as HTMLCollectionOf<HTMLElement>)[0];
16         if (window.pageYOffset + windowHeight > leftElement.offsetTop + parentOffsetTop + 200) {
17             leftElement.style.animation = "showLeft .6s forwards" //添加動畫  
18         } else {
19             leftElement.style.animation = "hideLeft 0s forwards" //隱藏動畫 
20         }
21         let rightElement = (parentEelement.getElementsByClassName(".item-rightContainer") as HTMLCollectionOf<HTMLElement>)[0];
22         if (window.pageYOffset + windowHeight > rightElement.offsetTop + parentOffsetTop + 200) {
23             rightElement.style.animation = "showRight .6s forwards" //添加動畫  
24         } else {
25             rightElement.style.animation = "hideRight 0s forwards" //隱藏動畫 
26         }
27     }

 

關鍵字:React 滾動、加載/出現動畫

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

聚甘新

六、線程池(一)

線程池

通過建立池可以有效的利用系統資源,節約系統性能。Java 中的線程池就是一種非常好的實現,從 JDK1.5 開始 Java 提供了一個線程工廠 Executors 用來生成線程池,通過 Executors 可以方便的生成不同類型的線程池。

線程池的優點

  • 降低資源消耗。線程的開啟和銷毀會消耗資源,通過重複利用已創建的線程降低線程創建和銷毀造成的消耗。
  • 提高響應速度。當任務到達時,任務可以不需要的等到線程創建就能立即執行。
  • 提高線程的可管理性。線程是稀缺資源,如果無限制的創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一的分配,調優和監控。

常見的線程池

  • CachedThreadPool:可緩存的線程池,該線程池中沒有核心線程,非核心線程的數量為 Integer.max_value,就是無限大,當有需要時創建線程來執行任務,沒有需要時回收線程,適用於耗時少,任務量大的情況。
  • SecudleThreadPool:周期性執行任務的線程池,按照某種特定的計劃執行線程中的任務,有核心線程,但也有非核心線程,非核心線程的大小也為無限大。適用於執行周期性的任務。
  • SingleThreadPool:只有一條線程來執行任務,適用於有順序的任務的應用場景。
  • FixedThreadPool:定長的線程池,有核心線程,核心線程的即為最大的線程數量,沒有非核心線程
  • Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor() 和 Executors.newCachedThreadPool() 等方法的底層都是通過 ThreadPoolExecutor 實現的。

ThreadPoolExecutor

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        // maximumPoolSize 必須大於 0,且必須大於 corePoolSize
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

參數介紹:

  • corePoolSize

    • 線程池的核心線程數。在沒有設置 allowCoreThreadTimeOut 為 true 的情況下,核心線程會在線程池中一直存活,即使處於閑置狀態。
    • 如果設置為 0,則表示在沒有任何任務時,銷毀線程池;如果大於 0,即使沒有任務時也會保證線程池的線程數量等於此值。
    • 但需要注意,此值如果設置的比較小,則會頻繁的創建和銷毀線程,如果設置的比較大,則會浪費系統資源,所以需要根據自己的實際業務來調整此值。
  • maximumPoolSize

    • 線程池所能容納的最大線程數。當活動線程(核心線程+非核心線程)達到這個數值后,後續任務將會根據 RejectedExecutionHandler 來進行拒絕策略處理。
    • 官方規定此值必須大於 0,也必須大於等於 corePoolSize,此值只有在任務比較多,且不能存放在任務隊列時,才會用到。
  • keepAliveTime

    • 非核心線程閑置時的超時時長。超過該時長,非核心線程就會被回收。
    • 若線程池通過 allowCoreThreadTimeOut() 方法設置 allowCoreThreadTimeOut 屬性為 true,則該時長同樣會作用於核心線程,AsyncTask 配置的線程池就是這樣設置的。
  • unit

    • keepAliveTime 時長對應的單位。
  • workQueue

    • 表示線程池執行的任務隊列,當線程池的所有線程都在處理任務時,如果來了新任務就會緩存到此任務隊列中排隊等待執行。
    • 是一個阻塞隊列 BlockingQueue,雖然它是 Queue 的子接口,但是它的主要作用並不是容器,而是作為線程同步的工具,他有一個特徵,當生產者試圖向 BlockingQueue 放入(put)元素,如果隊列已滿,則該線程被阻塞;當消費者試圖從 BlockingQueue 取出(take)元素,如果隊列已空,則該線程被阻塞。
  • ThreadFactory

    • 線程的創建工廠,功能很簡單,就是為線程池提供創建新線程的功能。
    • 也可以自定義一個線程工廠,通過實現 ThreadFactory 接口來完成,這樣就可以自定義線程的名稱或線程執行的優先級了。
    • 通常在創建線程池時不指定此參數,它會使用默認的線程創建工廠的方法來創建線程,源代碼如下:
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        // Executors.defaultThreadFactory() 為默認的線程創建工廠
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    public static ThreadFactory defaultThreadFactory() {
        return new DefaultThreadFactory();
    }
    // 默認的線程創建工廠,需要實現 ThreadFactory 接口
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
    
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        // 創建線程
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon()) 
                t.setDaemon(false); // 創建一個非守護線程
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY); // 線程優先級設置為默認值
            return t;
        }
    }
    
  • RejectedExecutionHandler

    • 表示指定線程池的拒絕策略,當線程池的任務已經在緩存隊列 workQueue 中存儲滿了之後,並且不能創建新的線程來執行此任務時,就會用到此拒絕策略.
    • 它屬於一種限流保護的機制,這裡有四種任務拒絕類型:
      1. AbortPolicy: 不執行新任務,直接拋出異常,提示線程池已滿,涉及到該異常的任務也不會被執行,線程池默認的拒絕策略就是該策略。
      2. DisCardPolicy: 不執行新任務,也不拋出異常,即忽略此任務;
      3. DisCardOldSetPolicy: 將消息隊列中的第一個任務(即等待時間最久的任務)替換為當前新進來的任務執行,忽略最早的任務(最先加入隊列的任務);
      4. CallerRunsPolicy: 把任務交給當前線程來執行;
    /**
     * 線程池的拒絕策略
     */
    @Test
    public void test1() {
        // 創建線程池 核心線程為1,最大線程為3,任務隊列大小為2
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 3, 10,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(2),
                new ThreadPoolExecutor.AbortPolicy() // 添加 AbortPolicy 拒絕策略
        );
    
    
        for (int i = 0; i < 6; i++) {
            poolExecutor.execute(() -> {
                System.out.println(Thread.currentThread().getName());
            });
        }
        
    }
    
    • 自定義線程池拒絕策略
    /**
     * 自定義線程池的拒絕策略
     * 實現接口 RejectedExecutionHandler
     */
    @Test
    public void test2() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 3, 10,
                TimeUnit.SECONDS,
                new LinkedBlockingDeque<>(2),
                new RejectedExecutionHandler() {
    
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        // 業務處理方法
                        System.out.println("執行自定義拒絕策略");
                    }
                }
        );
    
        for (int i = 0; i < 6; i++) {
            executor.execute(() -> {
                System.out.println(Thread.currentThread().getName());
            });
        }
    
    }
    

線程池工作原理

線程池的工作流程要從它的執行方法 execute() 說起,源碼如下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 當前工作的線程數小於核心線程數
    if (workerCountOf(c) < corePoolSize) {
        // 創建新的線程執行此任務
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 檢查線程池是否處於運行狀態,如果是則把任務添加到隊列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再出檢查線程池是否處於運行狀態,防止在第一次校驗通過後線程池關閉
        // 如果是非運行狀態,則將剛加入隊列的任務移除
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 如果線程池的線程數為 0 時(當 corePoolSize 設置為 0 時會發生)
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false); // 新建線程執行任務
    }
    // 核心線程都在忙且隊列都已爆滿,嘗試新啟動一個線程執行失敗
    else if (!addWorker(command, false)) 
        // 執行拒絕策略
        reject(command);
}

execute() VS submit()

  • execute() 和 submit() 都是用來執行線程池任務的,它們最主要的區別是,submit() 方法可以接收線程池執行的返回值,而 execute() 不能接收返回值。
  • sumbit 之所以可以接收返回值,是因為參數中可以傳遞:Callable task,而通過 callable 創建的線程任務有返回值並且可以拋出異常。
/**
 * execute VS sumbin
 * execute 提交任務沒有返回值
 * submit 提交任務有返回值
 */
@Test
public void test3() throws ExecutionException, InterruptedException {
    ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 10, 10, TimeUnit.SECONDS, new LinkedBlockingDeque<>(20));
    // execute
    executor.execute(new Runnable() {
        @Override
        public void run() {
            System.out.println("Hello, execute");
        }
    });

    // submit 使用
    Future<String> future = executor.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            System.out.println("Hello, submit");
            return "submit success";
        }
    });
    System.out.println(future.get());
}
  • 它們的另一個區別是 execute() 方法屬於 Executor 接口的方法,而 submit() 方法則是屬於 ExecutorService 接口的方法。

線程池的使用:

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @author xiandongxie
 */
public class ThreadPool {

    //參數初始化 返回Java虛擬機可用的處理器數量
//    private static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
    private static final int CPU_COUNT = 2;
    //核心線程數量大小
    private static final int corePoolSize = Math.max(2, Math.min(CPU_COUNT - 1, 4));
    //線程池最大容納線程數
    private static final int maximumPoolSize = CPU_COUNT * 2 + 1;
    //線程空閑后的存活時長
    private static final int keepAliveTime = 30;

    //任務過多后,存儲任務的一個阻塞隊列
    BlockingQueue<Runnable> workQueue = new SynchronousQueue<>();

    //線程的創建工廠
    ThreadFactory threadFactory = new ThreadFactory() {
        private final AtomicInteger mCount = new AtomicInteger(1);

        public Thread newThread(Runnable r) {
            return new Thread(r, "AdvacnedAsyncTask #" + mCount.getAndIncrement());
        }
    };

    //線程池任務滿載后採取的任務拒絕策略: 不執行新任務,直接拋出異常,提示線程池已滿
    RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();

    //線程池對象,創建線程
    ThreadPoolExecutor mExecute = new ThreadPoolExecutor(
            corePoolSize,
            maximumPoolSize,
            keepAliveTime,
            TimeUnit.SECONDS,
            workQueue,
            threadFactory,
            rejectHandler
    );

    public static void main(String[] args) {
        System.out.println("main start ..... \nCPU_COUNT = " + CPU_COUNT + "\tcorePoolSize=" + corePoolSize + "\tmaximumPoolSize=" + maximumPoolSize);
        
        ThreadPool threadPool = new ThreadPool();
        ThreadPoolExecutor execute = threadPool.mExecute;
        // 預啟動所有核心線程
        execute.prestartAllCoreThreads();

        for (int i = 0; i < 5; i++) {
            execute.execute(new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName() + "\tstart..." + System.currentTimeMillis());
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println(Thread.currentThread().getName() + "\tend..." + System.currentTimeMillis());
                }
            });
        }
        execute.shutdown();
        
        System.out.println("main end .....");
    }
}

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司“嚨底家”!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

聚甘新

Spring Boot 2 實戰:利用Redis的Geo功能實現查找附近的位置

1. 前言

老闆突然要上線一個需求,獲取當前位置方圓一公里的業務代理點。明天上線!當接到這個需求的時候我差點吐血,這時間也太緊張了。趕緊去查相關的技術選型。經過一番折騰,終於在晚上十點完成了這個需求。現在把大致實現的思路總結一下。

2. MySQL 不合適

遇到需求,首先要想到現有的東西能不能滿足,成本如何。

MySQL是我首先能夠想到的,畢竟大部分數據要持久化到MySQL。但是使用MySQL需要自行計算Geohash。需要使用大量數學幾何計算,並且需要學習地理相關知識,門檻較高,短時間內不可能完成需求,而且長期來看這也不是MySQL擅長的領域,所以沒有考慮它。

Geohash 參考 https://www.cnblogs.com/LBSer/p/3310455.html

2. Redis 中的GEO

Redis是我們最為熟悉的K-V數據庫,它常被拿來作為高性能的緩存數據庫來使用,大部分項目都會用到它。從3.2版本開始它開始提供了GEO能力,用來實現諸如附近位置、計算距離等這類依賴於地理位置信息的功能。GEO相關的命令如下:

Redis命令 描述
GEOHASH 返回一個或多個位置元素的 Geohash 表示
GEOPOS 從key里返回所有給定位置元素的位置(經度和緯度)
GEODIST 返回兩個給定位置之間的距離
GEORADIUS 以給定的經緯度為中心, 找出某一半徑內的元素
GEOADD 將指定的地理空間位置(緯度、經度、名稱)添加到指定的key中
GEORADIUSBYMEMBER 找出位於指定範圍內的元素,中心點是由給定的位置元素決定

Redis會假設地球為完美的球形, 所以可能有一些位置計算偏差,據說<=0.5%,對於有嚴格地理位置要求的需求來說要經過一些場景測試來檢驗是否能夠滿足需求。

2.1 寫入地理信息

那麼如何實現目標單位半徑內的所有元素呢?我們可以將所有的位置的經緯度通過上表中的GEOADD將這些地理信息轉換為52位的Geohash寫入Redis

該命令格式:

geoadd key longitude latitude member [longitude latitude member ...]

對應例子:

redis> geoadd cities:locs 117.12 39.08 tianjin 114.29 38.02  shijiazhuang 
(integer) 2

意思是將經度為117.12緯度為39.08的地點tianjin和經度為114.29緯度為38.02的地點shijiazhuang加入keycities:locssorted set集合中。可以添加一到多個位置。然後我們就可以藉助於其他命令來進行地理位置的計算了。

有效的經度從-180度到180度。有效的緯度從-85.05112878度到85.05112878度。當坐標位置超出上述指定範圍時,該命令將會返回一個錯誤。

2.2 統計單位半徑內的地區

我們可以藉助於GEORADIUS來找出以給定經緯度,某一半徑內的所有元素。

該命令格式:

georadius key longtitude latitude radius m|km|ft|mi [WITHCOORD] [WITHDIST] [WITHHASH] [COUNT count] [ASC|DESC] 

這個命令比GEOADD要複雜一些:

  • radius 半徑長度,必選項。後面的mkmftmi、是長度單位選項,四選一。
  • WITHCOORD 將位置元素的經度和維度也一併返回,非必選。
  • WITHDIST 在返回位置元素的同時, 將位置元素與中心點的距離也一併返回。 距離的單位和查詢單位一致,非必選。
  • WITHHASH 返回位置的52位精度的Geohash值,非必選。這個我反正很少用,可能其它一些偏向底層的LBS應用服務需要這個。
  • COUNT 返回符合條件的位置元素的數量,非必選。比如返回前10個,以避免出現符合的結果太多而出現性能問題。
  • ASC|DESC 排序方式,非必選。默認情況下返回未排序,但是大多數我們需要進行排序。參照中心位置,從近到遠使用ASC ,從遠到近使用DESC

例如,我們在 cities:locs 中查找以(115.03,38.44)為中心,方圓200km的城市,結果包含城市名稱、對應的坐標和距離中心點的距離(km),並按照從近到遠排列。命令如下:

redis> georadius cities:locs 115.03 38.44 200 km WITHCOORD WITHDIST ASC
1) 1) "shijiazhuang"
   2) "79.7653"
   3) 1) "114.29000169038772583"
      2) "38.01999994251037407"
2) 1) "tianjin"
   2) "186.6937"
   3) 1) "117.02000230550765991"
      2) "39.0800000535766543"

你可以加上 COUNT 1來查找最近的一個位置。

3. 基於Redis GEO實戰

大致的原理思路說完了,接下來就是實操了。結合Spring Boot應用我們應該如何做?

3.1 開發環境

需要具有GEO特性的Redis版本,這裏我使用的是Redis 4 。另外我們客戶端使用 spring-boot-starter-data-redis 。這裏我們會使用到 RedisTemplate對象。

3.2 批量添加位置信息

第一步,我們需要將位置數據初始化到Redis中。在Spring Data Redis中一個位置坐標(lng,lat) 可以封裝到org.springframework.data.geo.Point對象中。然後指定一個名稱,就組成了一個位置Geo信息。RedisTemplate提供了批量添加位置信息的方法。我們可以將章節2.1中的添加命令轉換為下面的代碼:

   Map<String, Point> points = new HashMap<>();
   points.put("tianjin", new Point(117.12, 39.08));
   points.put("shijiazhuang", new Point(114.29, 38.02));
   // RedisTemplate 批量添加 Geo
   redisTemplate.boundGeoOps("cities:locs").add(points);

可以結合Spring Boot 提供的ApplicationRunner接口來實現初始化。

@Bean
public ApplicationRunner cacheActiveAppRunner(RedisTemplate<String, String> redisTemplate) {

    return args -> {
        final String GEO_KEY = "cities:locs";

        // 清理緩存
        redisTemplate.delete(GEO_KEY);
        
        Map<String, Point> points = new HashMap<>();
        points.put("tianjin", new Point(117.12, 39.08));
        points.put("shijiazhuang", new Point(114.29, 38.02));
        // RedisTemplate 批量添加 GeoLocation
        BoundGeoOperations<String, String> geoOps = redisTemplate.boundGeoOps(GEO_KEY);
        geoOps.add(points);
    };
}

地理數據持久化到MySQL,然後同步到Redis中。

3.3 查詢附近的特定位置

RedisTemplate 針對GEORADIUS命令也有封裝:

GeoResults<GeoLocation<M>> radius(K key, Circle within, GeoRadiusCommandArgs args)

Circle對象是封裝覆蓋的面積(圖1),需要的要素為中心點坐標Point對象、半徑(radius)、計量單位(metric), 例如:

Point point = new Point(115.03, 38.44);

Metric metric = RedisGeoCommands.DistanceUnit.KILOMETERS;
Distance distance = new Distance(200, metric);

Circle circle = new Circle(point, distance);

GeoRadiusCommandArgs用來封裝GEORADIUS的一些可選命令參數,參見章節2.2中的WITHCOORDCOUNTASC等,例如我們需要在返回結果中包含坐標、中心距離、由近到遠排序的前5條數據:

RedisGeoCommands.GeoRadiusCommandArgs args = RedisGeoCommands
        .GeoRadiusCommandArgs
        .newGeoRadiusArgs()
        .includeDistance()
        .includeCoordinates()
        .sortAscending()
        .limit(limit);

然後執行 radius方法就會拿到GeoResults<RedisGeoCommands.GeoLocation<String>>封裝的結果,我們對這個可迭代對象進行解析就可以拿到我們想要的數據:

GeoResults<RedisGeoCommands.GeoLocation<String>> radius = redisTemplate.opsForGeo()
        .radius(GEO_STAGE, circle, args);

if (radius != null) {
    List<StageDTO> stageDTOS = new ArrayList<>();
    radius.forEach(geoLocationGeoResult -> {
        RedisGeoCommands.GeoLocation<String> content = geoLocationGeoResult.getContent();
        //member 名稱  如  tianjin 
        String name = content.getName();
        // 對應的經緯度坐標
        Point pos = content.getPoint();
        // 距離中心點的距離
        Distance dis = geoLocationGeoResult.getDistance();
    });
}

3.4 刪除元素

有時候我們可能需要刪除某個位置元素,但是RedisGeo並沒有刪除成員的命令。不過由於它的底層是zset,我們可以藉助zrem命令進行刪除,對應的Java代碼為:

redisTemplate.boundZSetOps(GEO_STAGE).remove("tianjin");

4. 總結

今天我們使用RedisGeo特性實現了常見的附近的地理信息查詢需求,簡單易上手。其實使用另一個Nosql數據庫MongoDB也可以實現。在數據量比較小的情況下Redis已經能很好的滿足需要。如果數據量大可使用MongoDB來實現。 文中涉及的DEMO可通過我個人博客獲取。

關注公眾號:Felordcn 獲取更多資訊

個人博客:https://felord.cn

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

聚甘新

年僅 28 歲就宣布從字節跳動退休?

這两天,互聯網熱議最大的一個話題除了阿里 P8 程序員找生活助理的事,另外一個就是 28 歲的郭宇宣布從字節跳動退休,稱選擇經營溫泉旅行,選擇成為一名職業作家。

我看到這個話題的時候,情不自禁地“嘖嘖”了兩聲,真特么酸了,28 歲就退休了,我已經 31 歲了,還在辛苦打拚的路上,除了要忙工作,還要高產似母豬地更文,然而,即便我這麼努力,還是沒能成為一名“職業作家”,退休更是遙遙無期。

郭大佬非常牛逼的一點在於,高考之後就開始敲代碼,上了大學之後依然敲代碼,大三就在支付寶干過,然後創業的一家公司被字節跳動收購,再然後嘛,就財務自由退休了——28 歲,重新定義了退休的年紀。

字節跳動這家公司發展的真的是巨快,明星產品今日頭條和抖音,真的是國民級應用。反正我父母都是這兩款產品的忠實用戶,我妹妹雖然不玩今日頭條,但抖音玩得那叫一個熱火朝天。

我自己是不玩抖音也不看今日頭條的,因為覺得這種短視頻,或者說亂七八糟的新聞熱點有點浪費生命的感覺,所以一直很抵觸。

當然了,我如果說我一次也沒玩過,有點聖人的感覺,我做不到。但每次無聊到刷上倆小時的抖音,我就會噁心到把這款軟件卸載掉。尤其是聽到那些無厘頭的狂笑,我感覺到娛樂在致死。

這不是抖音的問題,是我的問題,是人性的問題,抖音就抓住了人性的弱點,讓你沉浸其中,忘乎所以。

抨擊歸抨擊,但我不能忽視的事實是,字節跳動是真的牛逼,郭大佬是真的有錢了。

每個人都有自己人生,郭大佬有實力又有運氣,他過的是一種極致的人生。

我是 2014 年回的洛陽,一回來就跟着一個老闆做創業項目,依稀還記得當初他給我許下的承諾:三年後讓你在洛陽買房買車,五年後帶你走上人生巔峰。

2015 年,我買了房,靠的是我和老婆辛苦攢下的一些積蓄,還有父母義無反顧的支持。老闆也借給了我兩三萬,一年後我就還他了,所以在買房這個承諾上,他有幫助,但遠非承諾中的那樣。

2016 年,我買了車,分期付款的那種,和老闆沒有一點關係。

至於五年後走上人生巔峰,更是瞎扯淡。我現在還是一名普通的程序員,生活的幸福指數也完全靠的是自己的付出。

這些年裡,老闆無數次胯下海口,聽得我耳朵都膩了。至於我為什麼還沒有離職,並不是我沉浸在溫柔故鄉,而是洛陽的軟件環境整體就這麼個樣,去哪都是打工,還不如自己踏踏實實做一些事情,比如說寫作。

對比我倆,就會發現一些很有意思的點,我來給同學們剖析一下。

1)學歷重不重要

很重要,郭大佬讀過深圳高級中學,深圳最好的高中之一,大學是暨南大學,211。

我呢,高中雖然是保送的,但那時候的學校已經走了下坡路,很動蕩,師資和校領導換了好幾波;然後我上的是一所大專。

所以我大學那會很自卑,即便專業是計算機網絡,也沒多少心思學習。而郭大佬就完全不一樣了,沉下一門心思學編程,為此還掛科了好幾門。因為他是非科班出身,專業是政治與行政管理。

假如,請允許我假如一下,給低學歷的同學們一點點信心。

假如我上大學那會一門心思撲倒編程上,大三也不至於出去參加培訓,真的,大把大把的時光我都浪費了。除了談戀愛是正事,我就只會打遊戲了。

假如拿現在的心態去過大學兩年的時光,我堅信,我一定能進阿里,因為拼過和沒拼過的人生差別巨大。

我就認識這樣一個初中小妹妹,平常老喊我二叔,搞得我都逆來順受了。她的成績非常優異,全年級第二名,為什麼不是第一名,因為語文成績拖了後腿。這不是關鍵,關鍵是小妹妹現在就開始學編程了,還去給初一的同學授過課。

後生可畏。

所以,我的結論就是,能通過學習改變命運,就下勁學,錯過這個年紀就真的沒機會了。如果真的上了大專,上了一般的本科,也不是沒有機會,別整天喊自己迷茫,誰的青春沒有迷茫過,關鍵是要發掘自己的興趣點,如果要從事程序員這個行業,就好好學編程。

2)要不要創業

十個創業九個坑,我只能這麼說,能進大廠進大廠,進不去進中廠,進不去中廠進小廠。如果非要創業,也得你自身實力夠硬,假如創業失敗,你還有出路,或者實在是沒有其他更好的選擇了,再選擇創業。

職場新人最好不要被忽悠去創業,太慘了。

你看,人家郭大佬在支付寶鍍了一層金,然後所在公司稀里糊塗被字節跳動收購了,這是運氣,沒得說。

大部分人的命運可能像我一樣,在日企待過三年半,有了一些資歷,然後作為技術大拿參与到創業公司,一開始老闆牛逼吹上天,最後,啥也沒撈着。

青春荒廢了,人際關係荒廢了,程序員的黃金年齡段也荒廢了。

3)要不要提前退休

有不少同學問過我這樣的話題,“二哥,我馬上到了結婚的年紀,雖然在一線城市掙得還可以,但遠沒到能買得起房子的水準,可能這輩子都不可能了,我想現在回二線城市或者三線城市,你看可行嗎?”

這種想法,其實就和郭大佬退休的想法是一致的,只不過郭財務自由了。

對於普通人來說,我的建議是這樣的,請認認真真做好筆記。

第一,不要盲目回二三線。

拿洛陽來說吧,一般程序員的極限工資就是一萬塊,撐死的那種。五險一金,包括獎金,能沒有公司就考慮沒有。

捫心自問一下,自己能否承受得起這份清心寡慾。另外,二三線城市也是會加班的,關鍵是不一定有加班工資。

第二,搞一份副業吧,同學們。

在一線城市,你可能沒有精力和時間搞副業,另外,主業的成長潛力並不比副業差,搞的意義不是特別大。但如果要回二三線,副業必須得搞,哪怕掙個零花錢,心裏不慌。

幸福指數高不高,離不開錢那,雖然很俗。粗茶淡飯沒問題,二三線城市的生活成本相對較低是真的,但你的掙錢能力也得匹配上吧,匹配不上的話,活得就會很累的。

我羡慕郭大佬,有些同學可能羡慕我,覺得我的幸福指數也很高。那我要告訴你的真相就是,我既要忙工作,還要讀書寫作,也是很拼的。

人生就是這樣,為別人的成功送上祝福的同時,不要忘記腳踏實地地活着。

如果覺得文章對你有點幫助,請微信搜索「 沉默王二 」第一時間閱讀。

本文已收錄 GitHub,傳送門~ ,裏面更有大廠面試完整考點,歡迎 Star。

我是沉默王二,一枚有顏值卻靠才華苟且的程序員。關注即可提升學習效率,別忘了三連啊,點贊、收藏、留言,我不挑,嘻嘻

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司“嚨底家”!

※推薦評價好的iphone維修中心

聚甘新