地理文本處理技術在高德的演進(上)

一、背景

地圖App的功能可以簡單概括為定位,搜索,導航三部分,分別解決在哪裡,去哪裡,和怎麼去的問題。高德地圖的搜索場景下,輸入的是,地理相關的檢索query,用戶位置,App圖面等信息,輸出的是,用戶想要的POI。如何能夠更加精準地找到用戶想要的POI,提高滿意度,是評價搜索效果的最關鍵指標。

一個搜索引擎通常可以拆分成query分析、召回、排序三個部分,query分析主要是嘗試理解query表達的含義,為召回和排序給予指導。

地圖搜索的query分析不僅包括通用搜索下的分詞,成分分析,同義詞,糾錯等通用NLP技術,還包括城市分析,wherewhat分析,路徑規劃分析等特定的意圖理解方式。

常見的一些地圖場景下的query意圖表達如下:

query分析是搜索引擎中策略密集的場景,通常會應用NLP領域的各種技術。地圖場景下的query分析,只需要處理地理相關的文本,多樣性不如網頁搜索,看起來會簡單一些。但是,地理文本通常比較短,並且用戶大部分的需求是唯一少量結果,要求精準度非常高,如何能夠做好地圖場景下的文本分析,並提升搜索結果的質量,是充滿挑戰的。

二、整體技術架構

搜索架構

類似於通用檢索的架構,地圖的檢索架構包括query分析,召回,排序三個主要部分。先驗的,用戶的輸入信息可以理解為多種意圖的表達,同時下發請求嘗試獲取檢索結果。后驗的,拿到每種意圖的檢索結果時,進行綜合判斷,選擇效果最好的那個。

query分析流程

具體的意圖理解可分為基礎query分析和應用query分析兩部分,基礎query分析主要是使用一些通用的NLP技術對query進行理解,包括分析,成分分析,省略,同義詞,糾錯等。應用query分析主要是針對地圖場景里的特定問題,包括分析用戶目標城市,是否是where+what表達,是否是從A到B的路徑規劃需求表達等。

整體技術演進

在地里文本處理上整體的技術演進經歷了規則為主,到逐步引入機器學習,到機器學習全面應用的過程。由於搜索模塊是一個高併發的線上服務,對於深度模型的引入有比較苛刻的條件,但隨着性能問題逐漸被解決,我們從各個子方向逐步引入深度學習的技術,進行新一輪的效果提升。

NLP領域技術在最近幾年取得了日新月異的發展,bert,XLNet等模型相繼霸榜,我們逐步統一化各個query分析子任務,使用統一的向量表示對進行用戶需求進行表達,同時進行seq2seq的多任務學習,在效果進一步提升的基礎上,也能夠保證系統不會過於臃腫。

本文就高德地圖搜索的地理文本處理,介紹相關的技術在過去幾年的演進。我們將選取一些點分上下兩篇進行介紹,上篇主要介紹搜索引擎中一些通用的query分析技術,包括糾錯,改寫和省略。下篇着重介紹地圖場景中特有query分析技術,包括城市分析,wherewhat分析,路徑規劃。

三、通用query分析技術演進

3.1 糾錯

在搜索引擎中,用戶輸入的檢索詞(query)經常會出現拼寫錯誤。如果直接對錯誤的query進行檢索,往往不會得到用戶想要的結果。因此不管是通用搜索引擎還是垂直搜索引擎,都會對用戶的query進行糾錯,最大概率獲得用戶想搜的query。

在目前的地圖搜索中,約有6%-10%的用戶請求會輸入錯誤,所以query糾錯在地圖搜索中是一個很重要的模塊,能夠極大的提升用戶搜索體驗。

在搜索引擎中,低頻和中長尾問題往往比較難解決,也是糾錯模塊面臨的主要問題。另外,地圖搜索和通用搜索,存在一個明顯的差異,地圖搜索query結構化比較突出,query中的片段往往包含一定的位置信息,如何利用好query中的結構化信息,更好地識別用戶意圖,是地圖糾錯獨有的挑戰。

常見錯誤分類

(1) 拼音相同或者相近,例如: 盤橋物流園-潘橋物流園
(2) 字形相近,例如: 河北冒黎-河北昌黎
(3) 多字或者漏字,例如: 泉州州頂街-泉州頂街

糾錯現狀

原始糾錯模塊包括多種召回方式,如:

拼音糾錯:主要解決短query的拼音糾錯問題,拼音完全相同或者模糊音作為糾錯候選。
拼寫糾錯:也叫形近字糾錯,通過遍歷替換形近字,用query熱度過濾,加入候選。
組合糾錯:通過翻譯模型進行糾錯替換,資源主要是通過query對齊挖掘的各種替換資源。

組合糾錯翻譯模型計算公式:

其中p(f)是語言模型,p(f|e)是替換模型。

問題1:召回方式存在缺陷。目前query糾錯模塊主要召回策略包括拼音召回、形近字召回,以及替換資源召回。對於低頻case,解決能力有限。

問題2:排序方式不合理。糾錯按照召回方式分為幾個獨立的模塊,分別完成相應的召回和排序,不合理。

技術改造

改造1:基於空間關係的實體糾錯
原始的糾錯主要是基於用戶session挖掘片段替換資源,所以對於低頻問題解決能力有限。但是長尾問題往往集中在低頻,所以低頻問題是當前的痛點。

地圖搜索與通用搜索引擎有個很大的區別在於,地圖搜索query比較結構化,例如北京市朝陽區阜榮街10號首開廣場。我們可以對query進行結構化切分(也就是地圖中成分分析的工作),得到這樣一種帶有類別的結構化描述,北京市【城市】朝陽區【區縣】阜榮街【道路】10號【門址後綴】首開廣場【通用實體】。

同時,我們擁有權威的地理知識數據,利用權威化的地理實體庫進行前綴樹+後綴樹的索引建庫,提取疑似糾錯的部分在索引庫中進行拉鏈召回,同時利用實體庫中的邏輯隸屬關係對糾錯結果進行過濾。實踐表明,這種方式對低頻的區劃或者實體的錯誤有着明顯的作用。

基於字根的字形相似度計算

上文提到的排序策略裏面通過字形的編輯距離作為排序的重要特徵,這裏我們開發了一個基於字根的字形相似度計算策略,對於編輯距離的計算更為細化和準確。漢字信息有漢字的字根拆分詞表和漢字的筆畫數。

將一個漢字拆分成多個字根,尋找兩個字的公共字根,根據公共字根筆畫數來計算連個字的相似度。

改造2:排序策略重構

原始的策略召回和排序策略耦合,導致不同的召回鏈路,存在顧此失彼的情況。為了能夠充分發揮各種召回方式的優勢,急需要對召回和排序進行解耦並進行全局排序優化。為此我們增加了排序模塊,將流程分為召回和排序兩階段。

模型選擇

對於這個排序問題,這裏我們參考業界的實踐,使用了基於pair-wise的gbrank進行模型訓練。

樣本建設

通過線上輸出結合人工review的方式構造樣本。

特徵建設
(1) 語義特徵。如統計語言模型。
(2) 熱度特徵。pv,點擊等。
(3) 基礎特徵。編輯距離,切詞和成分特徵,累積分佈特徵等。

這裏解決了糾錯模塊兩個痛點問題,一個是在地圖場景下的大部分低頻糾錯問題。另一個是重構了模塊流程,將召回和排序解耦,充分發揮各個召回鏈路的作用,召回方式更新后只需要重訓排序模型即可,使得模塊更加合理,為後面的深度模型升級打下良好的基礎。後面在這個框架下,我們通過深度模型進行seq2seq的糾錯召回,取得了進一步的收益。

3.2 改寫

糾錯作為query變換的一種方式的召回策略存在諸多限制,對於一些非典型的query變換表達,存在策略的空白。比如query=永城市新農合辦,目標POI是永城市新農合服務大廳。用戶的低頻query,往往得不到較好搜索效果,但其實用戶描述的語義與主poi的高頻query是相似的。

這裏我們提出一種query改寫的思路,可以將低頻query改寫成語義相似的高頻query,以更好地滿足用戶需求多樣性的表達。

這是一個從無到有的實現。用戶表達的query是多樣的,使用規則表達顯然是難以窮盡的,直觀的思路是通過向量的方式召回,但是向量召回的方式很可能出現泛化過多,不適應地圖場景的檢索的問題,這些都是要在實踐過程中需要考慮的問題。

方案

整體看,方案包括召回,排序,過濾,三個階段。

召回階段

我們調研了句子向量表示的幾種方法,選擇了算法簡單,效果和性能可以和CNN,RNN媲美的SIF(Smooth Inverse Frequency)。向量召回可以使用開源的Faiss向量搜索引擎,這裏我們使用了阿里內部的性能更好的的向量檢索引擎。

排序階段
樣本構建
原query與高頻query候選集合,計算語義相似度,選取語義相似度的TOPK,人工標註的訓練樣本。

特徵建設

1.基礎文本特徵
2.編輯距離
3.組合特徵

模型選擇

使用xgboost進行分數回歸

過濾階段
通過向量召回的query過度泛化非常嚴重,為了能夠在地圖場景下進行應用,增加了對齊模型。使用了兩種統計對齊模型giza和fastalign,實驗證明二者效果幾乎一致,但fastalign在性能上好於giza,所以選擇fastalign。

通過對齊概率和非對齊概率,對召回的結果進行進一步過濾,得到精度比較高的結果。

query改寫填補了原始query分析模塊中一些低頻表達無法滿足的空白,區別於同義詞或者糾錯的顯式query變換表達,句子的向量表示是相似query的一種隱式的表達,有其相應的優勢。

向量表示和召回也是深度學習模型逐步開始應用的嘗試。同義詞,改寫,糾錯,作為地圖中query變換主要的三種方式,以往在地圖模塊里比較分散,各司其職,也會有互相重疊的部分。在後續的迭代升級中,我們引入了統一的query變換模型進行改造,在取得收益的同時,也擺脫掉了過去很多規則以及模型耦合造成的歷史包袱。

3.2 省略

在地圖搜索場景里,有很多query包含無效詞,如果用全部query嘗試去召回很可能不能召回有效結果。如廈門市搜”湖裡區縣后高新技術園新捷創運營中心11樓1101室 縣后brt站”。這就需要一種檢索意圖,在不明顯轉義下,使用核心term進行召回目標poi候選集合,當搜索結果無果或者召回較差時起到補充召回的作用。

在省略判斷的過程中存在先驗后驗平衡的問題。省略意圖是一個先驗的判斷,但是期望的結果是能夠進行POI有效召回,和POI的召回字段的現狀密切相關。如何能夠在策略設計的過程中保持先驗的一致性,同時能夠在後驗POI中拿到相對好的效果,是做好省略模塊比較困難的地方。

原始的省略模塊主要是基於規則進行的,規則依賴的主要特徵是上游的成分分析特徵。由於基於規則擬合,模型效果存在比較大的優化空間。另外,由於強依賴成分分析,模型的魯棒性並不好。

技術改造

省略模塊的改造主要完成了規則到crf模型的升級,其中也離線應用了深度學習模型輔助樣本生成。

模型選擇

識別出來query哪些部分是核心哪些部分是可以省略的,是一個序列標註問題。在淺層模型的選型中,顯而易見地,我們使用了crf模型。

特徵建設

term特徵。使用了賦權特徵,詞性,先驗詞典特徵等。
成分特徵。仍然使用成分分析的特徵。
統計特徵。統計片段的左右邊界熵,城市分佈熵等,通過分箱進行離散化。

樣本建設

項目一期我們使用了使用線上策略粗標,外包細標的方式,構造了萬級的樣本供crf模型訓練。

但是省略query的多樣性很高,使用萬級的樣本是不夠的,在線上模型無法快速應用深度模型的情況下,我們使用了boostraping的方式,藉助深度模型的泛化能力,離線構造了大量樣本。

使用了這種方式,樣本從萬級很容易擴充到百萬級,我們仍然使用crf模型進行訓練和線上應用。

在省略模塊,我們完成了規則到機器學習的升級,引入了成分以外的其他特徵,提升了模型的魯棒性。同時並且利用離線深度學習的方式進行樣本構造的循環,提升了樣本的多樣性,使得模型能夠更加接近crf的天花板。

在後續深度模型的建模中,我們逐步擺脫了對成分分析特徵的依賴,對query到命中poi核心直接進行建模,構建大量樣本,取得了進一步的收益。

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

3c收購,鏡頭 收購有可能以全新價回收嗎?

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

賣IPHONE,iPhone回收,舊換新!教你怎麼賣才划算?

生產者-消費者模型在Hudi中的應用

介紹

生產者-消費者模型用於解耦生產者與消費者,平衡兩者之間的能力不平衡,該模型廣泛應用於各個系統中,Hudi也使用了該模型控制對記錄的處理,即記錄會被生產者生產至隊列中,然後由消費者從隊列中消費,更具體一點,對於更新操作,生產者會將文件中老的記錄放入隊列中等待消費者消費,消費后交由HoodieMergeHandle處理;對於插入操作,生產者會將新記錄放入隊列中等待消費者消費,消費后交由HandleCreateHandle處理。

入口

前面的文章中提到過無論是HoodieCopyOnWriteTable#handleUpdate處理更新時直接生成了一個SparkBoundedInMemoryExecutor對象,還是HoodieCopyOnWriteTable#handleInsert處理插入時生成了一個CopyOnWriteLazyInsertIterable對象,再迭代時調用該對象的CopyOnWriteLazyInsertIterable#computeNext方法生成SparkBoundedInMemoryExecutor對象。最後兩者均會調用SparkBoundedInMemoryExecutor#execute開始記錄的處理,該方法核心代碼如下

  public E execute() {
    try {
      ExecutorCompletionService<Boolean> producerService = startProducers();
      Future<E> future = startConsumer();
      // Wait for consumer to be done
      return future.get();
    } catch (Exception e) {
      throw new HoodieException(e);
    }
  }

該方法會啟動所有生產者和單個消費者進行處理。

Hudi定義了BoundedInMemoryQueueProducer接口表示生產者,其子類實現如下

  • FunctionBasedQueueProducer,基於Function來生產記錄,在合併日誌log文件和數據parquet文件時使用,以便提供RealTimeView
  • IteratorBasedQueueProducer,基於迭代器來生產記錄,在插入更新時使用。

定義了BoundedInMemoryQueueConsumer類表示消費者,其主要子類實現如下

  • CopyOnWriteLazyInsertIterable$CopyOnWriteInsertHandler,主要處理CopyOnWrite表類型時的插入。
    • MergeOnReadLazyInsertIterable$MergeOnReadInsertHandler,主要處理MergeOnRead

表類型時的插入,其為CopyOnWriteInsertHandler的子類。

  • CopyOnWriteLazyInsertIterable$UpdateHandler,主要處理CopyOnWrite表類型時的更新。

整個生產消費相關的類繼承結構非常清晰。

對於生產者的啟動,startProducers方法核心代碼如下

  public ExecutorCompletionService<Boolean> startProducers() {
    // Latch to control when and which producer thread will close the queue
    final CountDownLatch latch = new CountDownLatch(producers.size());
    final ExecutorCompletionService<Boolean> completionService =
        new ExecutorCompletionService<Boolean>(executorService);
    producers.stream().map(producer -> {
      return completionService.submit(() -> {
        try {
          preExecute();
          producer.produce(queue);
        } catch (Exception e) {
          logger.error("error producing records", e);
          queue.markAsFailed(e);
          throw e;
        } finally {
          synchronized (latch) {
            latch.countDown();
            if (latch.getCount() == 0) {
              // Mark production as done so that consumer will be able to exit
              queue.close();
            }
          }
        }
        return true;
      });
    }).collect(Collectors.toList());
    return completionService;
  }

該方法使用CountDownLatch來協調生產者線程與消費者線程的退出動作,然後調用produce方法開始生產,對於插入更新時的IteratorBasedQueueProducer而言,其核心代碼如下

  public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
    ...
    while (inputIterator.hasNext()) {
      queue.insertRecord(inputIterator.next());
    }
    ...
  }

可以看到只要迭代器還有記錄(可能為插入時的新記錄或者更新時的舊記錄),就會往隊列中不斷寫入。

對於消費者的啟動,startConsumer方法的核心代碼如下

  private Future<E> startConsumer() {
    return consumer.map(consumer -> {
      return executorService.submit(() -> {
        ...
        preExecute();
        try {
          E result = consumer.consume(queue);
          return result;
        } catch (Exception e) {
          queue.markAsFailed(e);
          throw e;
        }
      });
    }).orElse(CompletableFuture.completedFuture(null));
  }

消費時會先進行執行前的準備,然後開始消費,其中consume方法的核心代碼如下

  public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
    Iterator<I> iterator = queue.iterator();

    while (iterator.hasNext()) {
      consumeOneRecord(iterator.next());
    }

    // Notifies done
    finish();

    return getResult();
  }

可以看到只要隊列中還有記錄,就可以獲取該記錄,然後調用不同BoundedInMemoryQueueConsumer子類的consumeOneRecord進行更新插入處理。

值得一提的是Hudi對隊列進行了流控,生產者不能無限制地將記錄寫入隊列中,隊列緩存的大小由用戶配置,隊列能放入記錄的條數由採樣的記錄大小和隊列緩存大小控制。

在生產時,會調用BoundedInMemoryQueue#insertRecord將記錄寫入隊列,其核心代碼如下

  public void insertRecord(I t) throws Exception {
    ...
    rateLimiter.acquire();
    // We are retrieving insert value in the record queueing thread to offload computation
    // around schema validation
    // and record creation to it.
    final O payload = transformFunction.apply(t);
    adjustBufferSizeIfNeeded(payload);
    queue.put(Option.of(payload));
  }

首先獲取一個許可(Semaphore),未成功獲取會被阻塞直至成功獲取,然後獲取記錄的負載以便調整隊列,然後放入內部隊列(LinkedBlockingQueue)中,其中adjustBufferSizeIfNeeded方法的核心代碼如下

  private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
    if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
      return;
    }

    final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
    final long newAvgRecordSizeInBytes =
        Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
    final int newRateLimit =
        (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));

    // If there is any change in number of records to cache then we will either release (if it increased) or acquire
    // (if it decreased) to adjust rate limiting to newly computed value.
    if (newRateLimit > currentRateLimit) {
      rateLimiter.release(newRateLimit - currentRateLimit);
    } else if (newRateLimit < currentRateLimit) {
      rateLimiter.acquire(currentRateLimit - newRateLimit);
    }
    currentRateLimit = newRateLimit;
    avgRecordSizeInBytes = newAvgRecordSizeInBytes;
    numSamples++;
  }

首先看是否已經達到採樣頻率,然後計算新的記錄平均大小和限流速率,如果新的限流速率大於當前速率,則可釋放一些許可(供阻塞的生產者獲取後繼續生產),否則需要獲取(回收)一些許可(許可變少後生產速率自然就降低了)。該操作可根據採樣的記錄大小動態調節速率,不至於在記錄負載太大和記錄負載太小時,放入同等個數,從而起到動態調節作用。

在消費時,會調用BoundedInMemoryQueue#readNextRecord讀取記錄,其核心代碼如下

  private Option<O> readNextRecord() {
    ...
    rateLimiter.release();
    Option<O> newRecord = Option.empty();
    while (expectMoreRecords()) {
      try {
        throwExceptionIfFailed();
        newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
        if (newRecord != null) {
          break;
        }
      } catch (InterruptedException e) {
        throw new HoodieException(e);
      }
    }
    ...

    if (newRecord != null && newRecord.isPresent()) {
      return newRecord;
    } else {
      // We are done reading all the records from internal iterator.
      this.isReadDone.set(true);
      return Option.empty();
    }
  }

可以看到首先會釋放一個許可,然後判斷是否還可以讀取記錄(還在生產或者停止生產但隊列不為空都可讀取),然後從內部隊列獲取記錄或返回。

上述便是生產者-消費者在Hudi中應用的分析。

總結

Hudi採用了生產者-消費者模型來控制記錄的處理,與傳統多生產者-多消費者模型不同的是,Hudi現在只支持多生產者-單消費者模型,單消費者意味着Hudi暫時不支持文件的併發寫入。而對於生產消費的隊列的實現,Hudi並未僅僅只是基於LinkedBlockingQueue,而是採用了更精細化的速率控制,保證速率會隨着記錄負載大小的變化和配置的隊列緩存大小而動態變化,這也降低了系統發生OOM的概率。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

Alibaba Nacos 學習(五):K8S Nacos搭建,使用nfs

 

準備環境

Centos7  192.168.50.21 k8s-master 2G
Centos7  192.168.50.22 k8s-node01 2G
Centos7  192.168.50.23 k8s-node02 2G

K8S集群搭建參考 

 

master安裝好Git ,yum install git

master,node01,node02  安裝 nfs-utils

yum install nfs-utils

master,node01,node02添加nfs exports配置,為了解決後續的nfs報錯異常

/data/mysql-slave *(insecure,fsid=0,rw,async,no_root_squash)
/data/mysql-master *(insecure,fsid=0,rw,async,no_root_squash)
/data/nfs-share *(rw,fsid=0,sync,no_root_squash)
mysql-slave 數據庫從庫 
mysql-master 數據庫主庫
nfs-share nocas文件掛在目錄

後面的yml中會提到
master,node01,node02創建目錄
mkdir /data/mysql-slave
mkdir /data/mysql-master
mkdir /data/nfs-share 

 master 克隆代碼

   git clone https://github.com/nacos-group/nacos-k8s.git

克隆完成進入以下目錄

 cd /opt/nacos-k8s/deploy/

 

1.nfs安裝

kubectl create -f nfs/rbac.yaml 
kubectl create -f nfs/class.yaml 

修改nfs/deployment.yaml IP配置

 

 

 

kubectl create -f nfs/deployment.yaml

查看安裝狀態

kubectl get pod -l app=nfs-client-provisioner

 

 

 

2.mysql部署

cd /opt/nacos-k8s/deploy/mysql/

修改數據配置文件ip

vi mysql-master-nfs.yaml

 

 

 部署主庫

kubectl create -f mysql-master-nfs.yaml 

修改存庫ip

vi mysql-slave-nfs.yaml
kubectl create -f mysql-slave-nfs.yaml 

主從部署非常慢 耐心等待,如果報nfs相關的錯,重啟nfs即可

service nfs restart

 

 

3. 部署nacos

cd /opt/nacos-k8s/deploy/nacos/

 

 

 

 

 

kubectl create -f nacos-pvc-nfs.yaml 

 查看訪問端口

kubectl get svc|grep nacos

 

 

 

 

 查看K8S集群狀態

 

 Failed to pull image “nacos/nacos-server:latest”: rpc error: code = Unknown desc = context canceled

進去對應節點機器 ,拉取鏡像后,重新應用即可

kubectl apply -f

 4. 部署問題

部署過程中大部分都是NFS問題

可以參考

mount.nfs: No route to host
Warning FailedMount 100s (x5 over 10m) kubelet, node2 Unable to mount volumes for pod “nfs-client-provisioner-594f778474-whhb5_default(56aef93a-9d31-11e9-a4c4-00163e069f44)”: timeout expired waiting for volumes to attach or mount for pod “default”/”nfs-client-provisioner-594f778474-whhb5”. list of unmounted volumes=[nfs-client-root]. list of unattached volumes=[nfs-client-root nfs-client-provisioner-token-8dcrx]

修改deployment.yaml中server的IP地址為某個node節點的內網IP地址,圖1已標註

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

如何使用偽類選擇器

偽類選擇器介紹

  • 偽類選擇器就是用來給超級鏈接設置不同的狀態樣式。
  • 超級鏈接分為4種狀態如:正常狀態、訪問過後狀態、鼠標放上狀態、激活狀態。

偽類選擇器說明表

選擇器 描述
:link 向未被訪問的超級鏈接添加樣式,正常狀態。
:visited 向已經被訪問的超級鏈接添加樣式,訪問過後狀態。
:hover 當鼠標懸浮在超級鏈接上方時,向超級鏈接添加樣式,鼠標放上狀態。
:active 鼠標放在超級鏈接上並且點擊的一瞬間,向超級鏈接添加樣式,激活狀態。

偽類選擇器實踐

  • 讓我們進入偽類選擇器實踐,實踐內容將超級鏈接4種狀態進行演示,演示效果如:將向未被訪問的超級鏈接文本顏色設置為紅色、已經被訪問的超級鏈接文本顏色設置為綠色、當鼠標懸浮在超級鏈接上文本顏色設置為紫色、用鼠標點擊超級鏈接的一瞬間文本顏色設置為藍色

  • 代碼塊

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>偽類選擇器</title>
    <style>
        a:link{
            color:red;
        }
        a:visited{
            color: lime;
        }
        a:hover{
            color: purple;
        }
        a:active{
            color: blue;
        }
    </style>
</head>
  
<body>
    <a href="https://www.cnblogs.com/lq000122/">微笑是最初的信仰</a>
</body>
</html>
  • 正常狀態結果圖

  • 鼠標放上狀態結果圖

  • 激活狀態結果圖

  • 訪問過後狀態

總結

  • 超級鏈接的不同狀態他其實是由順序,也就是說偽類選擇器設置其實是順序的,如果按照偽類選擇器的順序,那麼設置的樣式就不會被渲染。
  • 順序:linkvisitedhoveractive

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

3c收購,鏡頭 收購有可能以全新價回收嗎?

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

賣IPHONE,iPhone回收,舊換新!教你怎麼賣才划算?

PHP 的 self 關鍵字用法

之前有人詢問 self 關鍵字的用法,答案是比較明顯的:靜態成員函數內不能用 this 調用非成員函數,但可以用 self 調用靜態成員函數/變量/常量;其他成員函數可以用 self 調用靜態成員函數以及非靜態成員函數。隨着討論的深入,發現 self 並沒有那麼簡單。鑒於此,本文先對幾個關鍵字做對比和區分,再總結 self 的用法。

parentstatic 以及 this 的區別

要想將徹底搞懂 self ,要與 parentstatic 以及 this 區分開。以下分別做對比。

parent

selfparent 的區分比較容易: parent 引用父類/基類被隱蓋的方法(或變量), self則引用自身方法(或變量)。例如構造函數中調用父類構造函數:

class Base {
    public function __construct() {
        echo "Base contructor!", PHP_EOL;
    }
}

class Child {
    public function __construct() {
        parent::__construct();
        echo "Child contructor!", PHP_EOL;
    }
}

new Child;
// 輸出:
// Base contructor!
// Child contructor!

 

static

static 常規用途是修飾函數或變量使其成為類函數和類變量,也可以修飾函數內變量延長其生命周期至整個應用程序的生命周期。但是其與 self 關聯上是PHP 5.3以來引入的新用途:靜態延遲綁定。

有了 static 的靜態延遲綁定功能,可以在運行時動態確定歸屬的類。例如:

class Base {
    public function __construct() {
        echo "Base constructor!", PHP_EOL;
    }

    public static function getSelf() {
        return new self();
    }

    public static function getInstance() {
        return new static();
    }

    public function selfFoo() {
        return self::foo();
    }

    public function staticFoo() {
        return static::foo();
    }

    public function thisFoo() {
        return $this->foo();
    }

    public function foo() {
        echo  "Base Foo!", PHP_EOL;
    }
}

class Child extends Base {
    public function __construct() {
        echo "Child constructor!", PHP_EOL;
    }

    public function foo() {
        echo "Child Foo!", PHP_EOL;
    }
}

$base = Child::getSelf();
$child = Child::getInstance();

$child->selfFoo();
$child->staticFoo();
$child->thisFoo();

 

程序輸出結果如下:

Base constructor!
Child constructor!
Base Foo!
Child Foo!
Child Foo!

 

在函數引用上, selfstatic 的區別是:對於靜態成員函數, self 指向代碼當前類, static 指向調用類;對於非靜態成員函數, self 抑制多態,指向當前類的成員函數, static 等同於 this ,動態指向調用類的函數。

parentselfstatic 三個關鍵字聯合在一起看挺有意思,分別指向父類、當前類、子類,有點“過去、現在、未來”的味道。

this

selfthis 是被討論最多,也是最容易引起誤用的組合。兩者的主要區別如下:

  1. this 不能用在靜態成員函數中, self 可以;
  2. 對靜態成員函數/變量的訪問, 建議 用 self ,不要用 $this::$this-> 的形式;
  3. 對非靜態成員變量的訪問,不能用 self ,只能用 this ;
  4. this 要在對象已經實例化的情況下使用, self 沒有此限制;
  5. 在非靜態成員函數內使用, self 抑制多態行為,引用當前類的函數;而 this 引用調用類的重寫(override)函數(如果有的話)。

self 的用途

看完與上述三個關鍵字的區別, self 的用途是不是呼之即出?一句話總結,那就是: self總是指向“當前類(及類實例)”。詳細說則是:

  1. 替代類名,引用當前類的靜態成員變量和靜態函數;
  2. 抑制多態行為,引用當前類的函數而非子類中覆蓋的實現;

槽點

  1. 這幾個關鍵字中,只有 this 要加 $ 符號且必須加,強迫症表示很難受;
  2. 靜態成員函數中不能通過 $this-> 調用非靜態成員函數,但是可以通過 self:: 調用,且在調用函數中未使用 $this-> 的情況下還能順暢運行。此行為貌似在不同PHP版本中表現不同,在當前的7.3中ok;
  3. 在靜態函數和非靜態函數中輸出 self ,猜猜結果是什麼?都是 string(4) "self" ,迷之輸出;
  4. return $this instanceof static::class; 會有語法錯誤,但是以下兩種寫法就正常:
    $class = static::class;
    return $this instanceof $class;
    // 或者這樣:
    return $this instanceof static;

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

帶你漲姿勢的認識一下 Kafka 消費者

之前我們介紹過了 Kafka 整體架構,Kafka 生產者,Kafka 生產的消息最終流向哪裡呢?當然是需要消費了,要不只產生一系列數據沒有任何作用啊,如果把 Kafka 比作餐廳的話,那麼生產者就是廚師的角色,消費者就是客人,只有廚師的話,那麼炒出來的菜沒有人吃也沒有意義,如果只有客人沒有廚師的話,誰會去這個店吃飯呢?!所以如果你看完前面的文章意猶未盡的話,可以繼續讓你爽一爽。如果你沒看過前面的文章,那就從現在開始讓你爽。

Kafka 消費者概念

應用程序使用 KafkaConsumer 從 Kafka 中訂閱主題並接收來自這些主題的消息,然後再把他們保存起來。應用程序首先需要創建一個 KafkaConsumer 對象,訂閱主題並開始接受消息,驗證消息並保存結果。一段時間后,生產者往主題寫入的速度超過了應用程序驗證數據的速度,這時候該如何處理?如果只使用單個消費者的話,應用程序會跟不上消息生成的速度,就像多個生產者像相同的主題寫入消息一樣,這時候就需要多個消費者共同參与消費主題中的消息,對消息進行分流處理。

Kafka 消費者從屬於消費者群組。一個群組中的消費者訂閱的都是相同的主題,每個消費者接收主題一部分分區的消息。下面是一個 Kafka 分區消費示意圖

上圖中的主題 T1 有四個分區,分別是分區0、分區1、分區2、分區3,我們創建一個消費者群組1,消費者群組中只有一個消費者,它訂閱主題T1,接收到 T1 中的全部消息。由於一個消費者處理四個生產者發送到分區的消息,壓力有些大,需要幫手來幫忙分擔任務,於是就演變為下圖

這樣一來,消費者的消費能力就大大提高了,但是在某些環境下比如用戶產生消息特別多的時候,生產者產生的消息仍舊讓消費者吃不消,那就繼續增加消費者。

如上圖所示,每個分區所產生的消息能夠被每個消費者群組中的消費者消費,如果向消費者群組中增加更多的消費者,那麼多餘的消費者將會閑置,如下圖所示

向群組中增加消費者是橫向伸縮消費能力的主要方式。總而言之,我們可以通過增加消費組的消費者來進行水平擴展提升消費能力。這也是為什麼建議創建主題時使用比較多的分區數,這樣可以在消費負載高的情況下增加消費者來提升性能。另外,消費者的數量不應該比分區數多,因為多出來的消費者是空閑的,沒有任何幫助。

Kafka 一個很重要的特性就是,只需寫入一次消息,可以支持任意多的應用讀取這個消息。換句話說,每個應用都可以讀到全量的消息。為了使得每個應用都能讀到全量消息,應用需要有不同的消費組。對於上面的例子,假如我們新增了一個新的消費組 G2,而這個消費組有兩個消費者,那麼就演變為下圖這樣

在這個場景中,消費組 G1 和消費組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬於不同的應用。

總結起來就是如果應用需要讀取全量消息,那麼請為該應用設置一個消費組;如果該應用消費能力不足,那麼可以考慮在這個消費組裡增加消費者

消費者組和分區重平衡

消費者組是什麼

消費者組(Consumer Group)是由一個或多個消費者實例(Consumer Instance)組成的群組,具有可擴展性和可容錯性的一種機制。消費者組內的消費者共享一個消費者組ID,這個ID 也叫做 Group ID,組內的消費者共同對一個主題進行訂閱和消費,同一個組中的消費者只能消費一個分區的消息,多餘的消費者會閑置,派不上用場。

我們在上面提到了兩種消費方式

  • 一個消費者群組消費一個主題中的消息,這種消費模式又稱為點對點的消費方式,點對點的消費方式又被稱為消息隊列
  • 一個主題中的消息被多個消費者群組共同消費,這種消費模式又稱為發布-訂閱模式

消費者重平衡

我們從上面的消費者演變圖中可以知道這麼一個過程:最初是一個消費者訂閱一個主題並消費其全部分區的消息,後來有一個消費者加入群組,隨後又有更多的消費者加入群組,而新加入的消費者實例分攤了最初消費者的部分消息,這種把分區的所有權通過一個消費者轉到其他消費者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示

重平衡非常重要,它為消費者群組帶來了高可用性伸縮性,我們可以放心的添加消費者或移除消費者,不過在正常情況下我們並不希望發生這樣的行為。在重平衡期間,消費者無法讀取消息,造成整個消費者組在重平衡的期間都不可用。另外,當分區被重新分配給另一個消費者時,消息當前的讀取狀態會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態之前會拖慢應用程序。

消費者通過向組織協調者(Kafka Broker)發送心跳來維護自己是消費者組的一員並確認其擁有的分區。對於不同不的消費群體來說,其組織協調者可以是不同的。只要消費者定期發送心跳,就會認為消費者是存活的並處理其分區中的消息。當消費者檢索記錄或者提交它所消費的記錄時就會發送心跳。

如果過了一段時間 Kafka 停止發送心跳了,會話(Session)就會過期,組織協調者就會認為這個 Consumer 已經死亡,就會觸發一次重平衡。如果消費者宕機並且停止發送消息,組織協調者會等待幾秒鐘,確認它死亡了才會觸發重平衡。在這段時間里,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協調者它要離開群組,組織協調者會觸發一次重平衡,盡量降低處理停頓。

重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現在社區還無法修改。

重平衡的過程對消費者組有極大的影響。因為每次重平衡過程中都會導致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機》中 p76 關於 Serial 收集器的描述):

更重要的是它在進行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結束。Stop The World 這個名字聽起來很帥,但這項工作實際上是由虛擬機在後台自動發起並完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應用來說都是難以接受的。

也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢……

創建消費者

上面的理論說的有點多,下面就通過代碼來講解一下消費者是如何消費的

在讀取消息之前,需要先創建一個 KafkaConsumer 對象。創建 KafkaConsumer 對象與創建 KafkaProducer 對象十分相似 — 把需要傳遞給消費者的屬性放在 properties 對象中,後面我們會着重討論 Kafka 的一些配置,這裏我們先簡單的創建一下,使用3個屬性就足矣,分別是 bootstrap.serverkey.deserializervalue.deserializer

這三個屬性我們已經用過很多次了,如果你還不是很清楚的話,可以參考

還有一個屬性是 group.id 這個屬性不是必須的,它指定了 KafkaConsumer 是屬於哪個消費者群組。創建不屬於任何一個群組的消費者也是可以的

Properties properties = new Properties();
        properties.put("bootstrap.server","192.168.1.9:9092");     properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");   properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);

主題訂閱

創建好消費者之後,下一步就開始訂閱主題了。subscribe() 方法接受一個主題列表作為參數,使用起來比較簡單

consumer.subscribe(Collections.singletonList("customerTopic"));

為了簡單我們只訂閱了一個主題 customerTopic,參數傳入的是一個正則表達式,正則表達式可以匹配多個主題,如果有人創建了新的主題,並且主題的名字與正則表達式相匹配,那麼會立即觸發一次重平衡,消費者就可以讀取新的主題。

要訂閱所有與 test 相關的主題,可以這樣做

consumer.subscribe("test.*");

輪詢

我們知道,Kafka 是支持訂閱/發布模式的,生產者發送數據給 Kafka Broker,那麼消費者是如何知道生產者發送了數據呢?其實生產者產生的數據消費者是不知道的,KafkaConsumer 採用輪詢的方式定期去 Kafka Broker 中進行數據的檢索,如果有數據就用來消費,如果沒有就再繼續輪詢等待,下面是輪詢等待的具體實現

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
      int updateCount = 1;
      if (map.containsKey(record.value())) {
        updateCount = (int) map.get(record.value() + 1);
      }
      map.put(record.value(), updateCount);
    }
  }
}finally {
  consumer.close();
}
  • 這是一個無限循環。消費者實際上是一個長期運行的應用程序,它通過輪詢的方式向 Kafka 請求數據。
  • 第三行代碼非常重要,Kafka 必須定期循環請求數據,否則就會認為該 Consumer 已經掛了,會觸發重平衡,它的分區會移交給群組中的其它消費者。傳給 poll() 方法的是一個超市時間,用 java.time.Duration 類來表示,如果該參數被設置為 0 ,poll() 方法會立刻返回,否則就會在指定的毫秒數內一直等待 broker 返回數據。
  • poll() 方法會返回一個記錄列表。每條記錄都包含了記錄所屬主題的信息,記錄所在分區的信息、記錄在分區中的偏移量,以及記錄的鍵值對。我們一般會遍歷這個列表,逐條處理每條記錄。
  • 在退出應用程序之前使用 close() 方法關閉消費者。網絡連接和 socket 也會隨之關閉,並立即觸發一次重平衡,而不是等待群組協調器發現它不再發送心跳並認定它已經死亡。

線程安全性

在同一個群組中,我們無法讓一個線程運行多個消費者,也無法讓多個線程安全的共享一個消費者。按照規則,一個消費者使用一個線程,如果一個消費者群組中多個消費者都想要運行的話,那麼必須讓每個消費者在自己的線程中運行,可以使用 Java 中的 ExecutorService 啟動多個消費者進行進行處理。

消費者配置

到目前為止,我們學習了如何使用消費者 API,不過只介紹了幾個最基本的屬性,Kafka 文檔列出了所有與消費者相關的配置說明。大部分參數都有合理的默認值,一般不需要修改它們,下面我們就來介紹一下這些參數。

  • fetch.min.bytes

該屬性指定了消費者從服務器獲取記錄的最小字節數。broker 在收到消費者的數據請求時,如果可用的數據量小於 fetch.min.bytes 指定的大小,那麼它會等到有足夠的可用數據時才把它返回給消費者。這樣可以降低消費者和 broker 的工作負載,因為它們在主題使用頻率不是很高的時候就不用來回處理消息。如果沒有很多可用數據,但消費者的 CPU 使用率很高,那麼就需要把該屬性的值設得比默認值大。如果消費者的數量比較多,把該屬性的值調大可以降低 broker 的工作負載。

  • fetch.max.wait.ms

我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數據時才會把它返回給消費者。而 fetch.max.wait.ms 則用於指定 broker 的等待時間,默認是 500 毫秒。如果沒有足夠的數據流入 kafka 的話,消費者獲取的最小數據量要求就得不到滿足,最終導致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數值設置的小一些。如果 fetch.max.wait.ms 被設置為 100 毫秒的延遲,而 fetch.min.bytes 的值設置為 1MB,那麼 Kafka 在收到消費者請求后,要麼返回 1MB 的數據,要麼在 100 ms 后返回所有可用的數據。就看哪個條件首先被滿足。

  • max.partition.fetch.bytes

該屬性指定了服務器從每個分區里返回給消費者的最大字節數。它的默認值時 1MB,也就是說,KafkaConsumer.poll() 方法從每個分區里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節。如果一個主題有20個分區和5個消費者,那麼每個消費者需要至少4 MB的可用內存來接收記錄。在為消費者分配內存時,可以給它們多分配一些,因為如果群組裡有消費者發生崩潰,剩下的消費者需要處理更多的分區。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節數(通過 max.message.size 屬性配置大),否則消費者可能無法讀取這些消息,導致消費者一直掛起重試。 在設置該屬性時,另外一個考量的因素是消費者處理數據的時間。消費者需要頻繁的調用 poll() 方法來避免會話過期和發生分區再平衡,如果單次調用poll() 返回的數據太多,消費者需要更多的時間進行處理,可能無法及時進行下一個輪詢來避免會話過期。如果出現這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會話過期時間。

  • session.timeout.ms

這個屬性指定了消費者在被認為死亡之前可以與服務器斷開連接的時間,默認是 3s。如果消費者沒有在 session.timeout.ms 指定的時間內發送心跳給群組協調器,就會被認定為死亡,協調器就會觸發重平衡。把它的分區分配給消費者群組中的其它消費者,此屬性與 heartbeat.interval.ms 緊密相關。heartbeat.interval.ms 指定了 poll() 方法向群組協調器發送心跳的頻率,session.timeout.ms 則指定了消費者可以多久不發送心跳。所以,這兩個屬性一般需要同時修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那麼 heartbeat.interval.ms 應該是 1s。把 session.timeout.ms 值設置的比默認值小,可以更快地檢測和恢復崩憤的節點,不過長時間的輪詢或垃圾收集可能導致非預期的重平衡。把該屬性的值設置得大一些,可以減少意外的重平衡,不過檢測節點崩潰需要更長的時間。

  • auto.offset.reset

該屬性指定了消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下的該如何處理。它的默認值是 latest,意思指的是,在偏移量無效的情況下,消費者將從最新的記錄開始讀取數據。另一個值是 earliest,意思指的是在偏移量無效的情況下,消費者將從起始位置處開始讀取分區的記錄。

  • enable.auto.commit

我們稍後將介紹幾種不同的提交偏移量的方式。該屬性指定了消費者是否自動提交偏移量,默認值是 true,為了盡量避免出現重複數據和數據丟失,可以把它設置為 false,由自己控制何時提交偏移量。如果把它設置為 true,還可以通過 auto.commit.interval.ms 屬性來控制提交的頻率

  • partition.assignment.strategy

我們知道,分區會分配給群組中的消費者。PartitionAssignor 會根據給定的消費者和主題,決定哪些分區應該被分配給哪個消費者,Kafka 有兩個默認的分配策略RangeRoundRobin

  • client.id

該屬性可以是任意字符串,broker 用他來標識從客戶端發送過來的消息,通常被用在日誌、度量指標和配額中

  • max.poll.records

該屬性用於控制單次調用 call() 方法能夠返回的記錄數量,可以幫你控制在輪詢中需要處理的數據量。

  • receive.buffer.bytes 和 send.buffer.bytes

socket 在讀寫數據時用到的 TCP 緩衝區也可以設置大小。如果它們被設置為 -1,就使用操作系統默認值。如果生產者或消費者與 broker 處於不同的數據中心內,可以適當增大這些值,因為跨數據中心的網絡一般都有比較高的延遲和比較低的帶寬。

提交和偏移量的概念

特殊偏移

我們上面提到,消費者在每次調用poll() 方法進行定時輪詢的時候,會返回由生產者寫入 Kafka 但是還沒有被消費者消費的記錄,因此我們可以追蹤到哪些記錄是被群組裡的哪個消費者讀取的。消費者可以使用 Kafka 來追蹤消息在分區中的位置(偏移量)

消費者會向一個叫做 _consumer_offset 的特殊主題中發送消息,這個主題會保存每次所發送消息中的分區偏移量,這個主題的主要作用就是消費者觸發重平衡後記錄偏移使用的,消費者每次向這個主題發送消息,正常情況下不觸發重平衡,這個主題是不起作用的,當觸發重平衡后,消費者停止工作,每個消費者可能會分到對應的分區,這個主題就是讓消費者能夠繼續處理消息所設置的。

如果提交的偏移量小於客戶端最後一次處理的偏移量,那麼位於兩個偏移量之間的消息就會被重複處理

如果提交的偏移量大於最後一次消費時的偏移量,那麼處於兩個偏移量中間的消息將會丟失

既然_consumer_offset 如此重要,那麼它的提交方式是怎樣的呢?下面我們就來說一下

提交方式

KafkaConsumer API 提供了多種方式來提交偏移量

自動提交

最簡單的方式就是讓消費者自動提交偏移量。如果 enable.auto.commit 被設置為true,那麼每過 5s,消費者會自動把從 poll() 方法輪詢到的最大偏移量提交上去。提交時間間隔由 auto.commit.interval.ms 控制,默認是 5s。與消費者里的其他東西一樣,自動提交也是在輪詢中進行的。消費者在每次輪詢中會檢查是否提交該偏移量了,如果是,那麼就會提交從上一次輪詢中返回的偏移量。

提交當前偏移量

auto.commit.offset 設置為 false,可以讓應用程序決定何時提交偏移量。使用 commitSync() 提交偏移量。這個 API 會提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。

commitSync() 將會提交由 poll() 返回的最新偏移量,如果處理完所有記錄后要確保調用了 commitSync(),否則還是會有丟失消息的風險,如果發生了在均衡,從最近一批消息到發生在均衡之間的所有消息都將被重複處理。

異步提交

異步提交 commitAsync() 與同步提交 commitSync() 最大的區別在於異步提交不會進行重試,同步提交會一致進行重試。

同步和異步組合提交

一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大的問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。但是如果在關閉消費者或再均衡前的最後一次提交,就要確保提交成功。

因此,在消費者關閉之前一般會組合使用commitAsync和commitSync提交偏移量

提交特定的偏移量

消費者API允許調用 commitSync() 和 commitAsync() 方法時傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。

文章參考:

《極客時間-Kafka核心技術與實戰》

《Kafka 權威指南》

關注公眾號獲取更多優質电子書,關注一下你就知道資源是有多好了

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

特斯拉發表電動卡車 Cybertruck,一台從科幻電影走出來的鋼鐵車

今天的 Elon Musk 看起來不像鋼鐵人,更像蝙蝠俠,因為他們的新車 Cybertruck,不但外型酷似蝙蝠車,同時還能防彈防撞,並且擁有超越保時捷的加速度,跟勝過市面上卡車的拖吊能力,更驚人的是,售價只要 39,900 美元起。

眾所期待的特斯拉新車 Cybertruck 今日正式發表,和之前流出的影像不同,Cybertruck 酷似隱形戰機 F-117 的設計,讓人聯想到蝙蝠車,甚至懷疑這是不是一台防雷達偵測的戰車?

Tesla Cybertruck 全車採用冷鑄鋼板,能夠抵擋 9mm 口徑手槍的射擊,現場展示用重鎚敲擊也毫髮無傷;車窗玻璃同樣採用防彈設計,然而有趣的是,現場展示時,被大鐵球砸出了一片雪花。「至少,它沒被打穿,你坐在裡面很安全。」Elon Musk 笑著說。

Cybertruck 為了因應負重,搭載了適應性氣壓懸吊系統,針對高速公路,或是越野泥巴路,能夠自動調整懸吊高度,同時也順便使用這個氣壓系統,做了一個高壓出力裝置,使用者可以自行加裝不同氣壓工具,像是高壓水槍或是電鑽等。

當重裝電動機車開上後廂時,懸吊系統會自動調整車尾高度,讓車身保持平衡。

車尾與其他皮卡車開放式貨斗不同,Cybertruck 採用封閉式貨斗,並有升降式尾門,現場展示時,將這台電動機車 ATV 直接騎上貨斗後,還能直接充電,顯然是在致敬蝙蝠車跟蝙蝠機車。

Cybertruck 如同其他皮卡車,車尾裝有釣鉤,能夠充當拖車使用,而歸功於它的強力馬達,拖車能力屌打了皮卡車霸主 Ford F-150,在現場展示的影片中,特斯拉讓 Cybertruck 跟 F-150 互相拖住對方,進行拔河測試,結果 F-150 整台被 Cybertruck 拖走。

F-150 慘遭 Cybertruck 拖走。

馬斯克強調,一般皮卡車需要另外裝載發電機才能使用電動工具,Cybertruck 直接提供了電源,因此省下不少空間,同時還提供強大的拖力。

此外,做為一台卡車,Cybertruck 莫名其妙地擁有超越保時捷的加速度,根據現場公布數據,最頂級版的 0-100 公里加速時間不到 3 秒。現場展示了 Cybertruck 與 Porsche 911 賽跑的影片,起步雖然小輸一點,但隨後就超越了 911。

現場展示競速影片,大約 1 秒後,Cybertruck 就超過了 911。

Tesla Cybertruck 共有 3 種版本,依照馬達數量來分別,最低價 39,900 美元起,最高 69,900 美元。Cybertruck 從今天起在美國開放預購,實際交車時間預計要等到 2021 年底。頂級的三馬達款,更預計要等到 2022 年底才會開始生產。

如同馬斯克開場所說,卡車在過去幾十年來都長得差不多,特斯拉要打造一台完全不一樣的卡車,同時還要保持零排放,跟超高性能,從今天的現場展示來看,特斯拉再次完成一個不可能的任務。在興奮之餘也別忘了,這一切都是現場展示,實際上如何,就有待實際交車後驗證了!

(合作媒體:。圖片來源:)

本站聲明:網站內容來源於EnergyTrend https://www.energytrend.com.tw/ev/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

自己動手實現分佈式任務調度框架(續)

  之前寫過一篇:本來是用來閑來分享一下自己的思維方式,時至今日發現居然有些人正在使用了,本着對代碼負責人的態度,對代碼部分已知bug進行了修改,並增加了若干功能,如立即啟動,實時停止等功能,新增加的功能會在這一篇做詳細的說明。

  提到分佈式任務調度,市面上本身已經有一些框架工具可以使用,但是個人覺得功能做的都太豐富,架構都過於複雜,所以才有了我重複造輪子。個人喜歡把複雜的問題簡單化,利用有限的資源實現竟可能多的功能。因為有幾個朋友問部署方式,這裏再次強調下:我的這個服務可以直接打成jar放在自己本地倉庫,然後依賴進去,或者直接copy代碼過去,當成自己項目的一部分就可以了。也就是說跟隨你們自己的項目啟動,所以我這裏也沒有寫界面。下面先談談怎麼基於上次的代碼實現任務立即啟動吧!

  調度和自己服務整合後部署圖抽象成如下:

  

 

 

   用戶在前端點擊立即請求按鈕,通過各種負載均衡軟件或者設備,到達某台機器的某個帶有本調度框架的服務,然後進行具體的執行,也就是說這個立即啟動就是一個最常見最簡單的請求,沒有過多複雜的問題(比如多節點會不會重複執行這些)。最簡單的辦法,當用戶請求過來直接用一個線程或者線程池執行用戶點的那個任務的邏輯代碼就行了,當然我這裏沒有那麼粗暴,現有的調度代碼資源如下:

package com.rdpaas.task.scheduler;

import com.rdpaas.task.common.Invocation;
import com.rdpaas.task.common.Node;
import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.common.Task;
import com.rdpaas.task.common.TaskDetail;
import com.rdpaas.task.common.TaskStatus;
import com.rdpaas.task.config.EasyJobConfig;
import com.rdpaas.task.repository.NodeRepository;
import com.rdpaas.task.repository.TaskRepository;
import com.rdpaas.task.strategy.Strategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 任務調度器
 * @author rongdi
 * @date 2019-03-13 21:15
 */
@Component
public class TaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);

    @Autowired
    private TaskRepository taskRepository;

    @Autowired
    private NodeRepository nodeRepository;

    @Autowired
    private EasyJobConfig config;

    /**
     * 創建任務到期延時隊列
      */
    private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>();

    /**
     * 可以明確知道最多只會運行2個線程,直接使用系統自帶工具就可以了
     */
    private ExecutorService bossPool = Executors.newFixedThreadPool(2);

    /**
     * 正在執行的任務的Future
     */
    private Map<Long,Future> doingFutures = new HashMap<>();

    /**
     * 聲明工作線程池
     */
    private ThreadPoolExecutor workerPool;
    
    /**
     * 獲取任務的策略
     */
    private Strategy strategy;


    @PostConstruct
    public void init() {
        /**
         * 根據配置選擇一個節點獲取任務的策略
         */
        strategy = Strategy.choose(config.getNodeStrategy());
        /**
         * 自定義線程池,初始線程數量corePoolSize,線程池等待隊列大小queueSize,當初始線程都有任務,並且等待隊列滿后
         * 線程數量會自動擴充最大線程數maxSize,當新擴充的線程空閑60s后自動回收.自定義線程池是因為Executors那幾個線程工具
         * 各有各的弊端,不適合生產使用
         */
        workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize()));
        /**
         * 執行待處理任務加載線程
         */
        bossPool.execute(new Loader());
        /**
         * 執行任務調度線程
         */
        bossPool.execute(new Boss());
    
    }

    class Loader implements Runnable {

        @Override
        public void run() {
            for(;;) {
                try { 
                    /**
                     * 先獲取可用的節點列表
                     */
                    List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2);
                    if(nodes == null || nodes.isEmpty()) {
                        continue;
                    }
                    /**
                     * 查找還有指定時間(單位秒)才開始的主任務列表
                     */
                    List<Task> tasks = taskRepository.listNotStartedTasks(config.getFetchDuration());
                    if(tasks == null || tasks.isEmpty()) {
                        continue;
                    }
                    for(Task task:tasks) {
                        
                        boolean accept = strategy.accept(nodes, task, config.getNodeId());
                        /**
                         * 不該自己拿就不要搶
                         */
                        if(!accept) {
                            continue;
                        }
                        /**
                         * 先設置成待執行
                         */
                        task.setStatus(TaskStatus.PENDING);
                        task.setNodeId(config.getNodeId());
                        /**
                         * 使用樂觀鎖嘗試更新狀態,如果更新成功,其他節點就不會更新成功。如果其它節點也正在查詢未完成的
                         * 任務列表和當前這段時間有節點已經更新了這個任務,version必然和查出來時候的version不一樣了,這裏更新
                         * 必然會返回0了
                         */
                        int n = taskRepository.updateWithVersion(task);
                        Date nextStartTime = task.getNextStartTime();
                        if(n == 0 || nextStartTime == null) {
                            continue;
                        }
                        /**
                         * 封裝成延時對象放入延時隊列,這裏再查一次是因為上面樂觀鎖已經更新了版本,會導致後面結束任務更新不成功
                         */
                        task = taskRepository.get(task.getId());
                        DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task);
                        taskQueue.offer(delayItem);
                        
                    }
                    Thread.sleep(config.getFetchPeriod());
                } catch(Exception e) {
                    logger.error("fetch task list failed,cause by:{}", e);
                }
            }
        }
        
    }
    
    class Boss implements Runnable {
        @Override
        public void run() {
            for (;;) {
                try {
                     /**
                     * 時間到了就可以從延時隊列拿出任務對象,然後交給worker線程池去執行
                     */
                    DelayItem<Task> item = taskQueue.take();
                    if(item != null && item.getItem() != null) {
                        Task task = item.getItem();
                        /**
                         * 真正開始執行了設置成執行中
                         */
                        task.setStatus(TaskStatus.DOING);
                        /**
                         * loader線程中已經使用樂觀鎖控制了,這裏沒必要了
                         */
                        taskRepository.update(task);
                        /**
                         * 提交到線程池
                         */
                        Future future = workerPool.submit(new Worker(task));
                        /**
                         * 暫存在doingFutures
                         */
                        doingFutures.put(task.getId(),future);
                    }
                     
                } catch (Exception e) {
                    logger.error("fetch task failed,cause by:{}", e);
                }
            }
        }

    }

    class Worker implements Callable<String> {

        private Task task;

        public Worker(Task task) {
            this.task = task;
        }

        @Override
        public String call() {
            logger.info("Begin to execute task:{}",task.getId());
            TaskDetail detail = null;
            try {
                //開始任務
                detail = taskRepository.start(task);
                if(detail == null) return null;
                //執行任務
                task.getInvokor().invoke();
                //完成任務
                finish(task,detail);
                logger.info("finished execute task:{}",task.getId());
                /**
                 * 執行完后刪了
                 */
                doingFutures.remove(task.getId());
            } catch (Exception e) {
                logger.error("execute task:{} error,cause by:{}",task.getId(), e);
                try {
                    taskRepository.fail(task,detail,e.getCause().getMessage());
                } catch(Exception e1) {
                    logger.error("fail task:{} error,cause by:{}",task.getId(), e);
                }
            }
            return null;
        }

    }

    /**
     * 完成子任務,如果父任務失敗了,子任務不會執行
     * @param task
     * @param detail
     * @throws Exception
     */
    private void finish(Task task,TaskDetail detail) throws Exception {

        //查看是否有子類任務
        List<Task> childTasks = taskRepository.getChilds(task.getId());
        if(childTasks == null || childTasks.isEmpty()) {
            //當沒有子任務時完成父任務
            taskRepository.finish(task,detail);
            return;
        } else {
            for (Task childTask : childTasks) {
                //開始任務
                TaskDetail childDetail = null;
                try {
                    //將子任務狀態改成執行中
                    childTask.setStatus(TaskStatus.DOING);
                    childTask.setNodeId(config.getNodeId());
                    //開始子任務
                    childDetail = taskRepository.startChild(childTask,detail);
                    //使用樂觀鎖更新下狀態,不然這裏可能和恢複線程產生併發問題
                    int n = taskRepository.updateWithVersion(childTask);
                    if (n > 0) {
                        //再從數據庫取一下,避免上面update修改后version不同步
                        childTask = taskRepository.get(childTask.getId());
                        //執行子任務
                        childTask.getInvokor().invoke();
                        //完成子任務
                        finish(childTask, childDetail);
                    }
                } catch (Exception e) {
                    logger.error("execute child task error,cause by:{}", e);
                    try {
                        taskRepository.fail(childTask, childDetail, e.getCause().getMessage());
                    } catch (Exception e1) {
                        logger.error("fail child task error,cause by:{}", e);
                    }
                }
            }
            /**
             * 當有子任務時完成子任務后再完成父任務
             */
            taskRepository.finish(task,detail);

        }

    }

    /**
     * 添加任務
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addTask(String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        return taskRepository.insert(task);
    }

    /**
     * 添加子任務
     * @param pid
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addChildTask(Long pid,String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        task.setPid(pid);
        return taskRepository.insert(task);
    }

   
}

  上面主要就是三組線程,Loader負責加載將要執行的任務放入本地的任務隊列,Boss線程負責取出任務隊列的任務,然後分配Worker線程池的一個線程去執行。由上面的代碼可以看到如果要立即執行,其實只需要把一個延時為0的任務放入任務隊列,等着Boss線程去取然後分配給worker執行就可以實現了,代碼如下:

    /**
     * 立即執行任務,就是設置一下延時為0加入任務隊列就好了,這個可以外部直接調用
     * @param taskId
     * @return
     */
    public boolean startNow(Long taskId) {
        Task task = taskRepository.get(taskId);
        task.setStatus(TaskStatus.DOING);
        taskRepository.update(task);
        DelayItem<Task> delayItem = new DelayItem<Task>(0L, task);
        return taskQueue.offer(delayItem);
    }

  啟動不用再多說,下面介紹一下停止任務,根據面向對象的思維,用戶要想停止一個任務,最終執行停止任務的就是正在執行任務的那個節點。停止任務有兩種情況,第一種任務沒有正在運行如何停止,第二種是任務正在運行如何停止。第一種其實直接改變一下任務對象的狀態為停止就行了,不必多說。下面主要考慮如何停止正在運行的任務,細心的朋友可能已經發現上面代碼和之前那一篇代碼有點區別,之前用的Runnble作為線程實現接口,這個用了Callable,其實在java中停止線程池中正在運行的線程最常用的就是直接調用future的cancel方法了,要想獲取到這個future對象就需要將以前實現Runnbale改成實現Callable,然後提交到線程池由execute改成submit就可以了,然後每次提交到線程池得到的future對象使用taskId一起保存在一個map中,方便根據taskId隨時找到。當然任務執行完后要及時刪除這個map里的任務,以免常駐其中導致內存溢出。停止任務的請求流程如下

  

 

 

  圖還是原來的圖,但是這時候情況不一樣了,因為停止任務的時候假如當前正在執行這個任務的節點處於服務1,負載均衡是不知道要去把你引到服務1的,他可能會引入到服務2,那就悲劇了,所以通用的做法就是停止請求過來不管落到哪個節點上,那個節點就往一個公用的mq上發一個帶有停止任務業務含義的消息,各個節點訂閱這個消息,然後判斷都判斷任務在不在自己這裏執行,如果在就執行停止操作。但是這樣勢必讓我們的調度服務又要依賴一個外部的消息隊列服務,就算很方便的就可以引入一個外部的消息隊列,但是你真的可以駕馭的了嗎,消息丟了咋辦,重複發送了咋辦,消息服務掛了咋辦,網絡斷了咋辦,又引入了一大堆問題,那我是不是又要寫n篇文章來分別解決這些問題。往往現實卻是就是這麼殘酷,你解決了一個問題,引入了更多的問題,這就是為什麼bug永遠改不完的道理了。當然這不是我的風格,我的風格是利用有限的資源做盡可能多的事情(可能是由於我工作的企業都是那種資源貧瘠的,養成了我這種習慣,土豪公司的程序員請繞道,哈哈)。

  簡化一下問題:目前的問題就是如何讓正在執行任務的節點知道,然後停止正在執行的這個任務,其實就是這個停止通知如何實現。這不免讓我想起了12306網站上買票,其實我們作為老百姓多麼希望12306可以在有票的時候發個短信通知一下我們,然後我們上去搶,但是現實卻是,你要麼使用軟件一直刷,要麼是自己隔一段時間上去瞄一下有沒有票。如果把有票了給我們發短信通知定義為異步通知,那麼這種我們要隔一段時間自己去瞄一下的方式就是同步輪訓。這兩種方式都能達到告知的目的,關鍵的區別在於你到底有沒有時間去一直去瞄,不過相比於可以回家,這些時間都是值得的。個人認為軟件的設計其實就是一個權衡是否值得的過程。如果約定了不使用外部消息隊列這種異步通知的方式,那麼我們只能使用同步輪訓的方式了。不過正好我們的任務調度本身已經有一個心跳機制,沒隔一段時間就去更新一下節點狀態,如果我們把用戶的停止請求作為命令信息更新到每個節點的上,然後隨着心跳獲取到這個節點的信息,然後判斷這個命令,做相應的處理是不是就可以完美解決這個問題。值得嗎?很明顯是值得的,我們只是在心跳邏輯上加一個小小的副作用就實現了通知功能了。代碼如下

package com.rdpaas.task.common;

/**
 * @author rongdi
 * @date 2019/11/26
 */
public enum NotifyCmd {

    //沒有通知,默認狀態
    NO_NOTIFY(0),
    //開啟任務(Task)
    START_TASK(1),
    //修改任務(Task)
    EDIT_TASK(2),
    //停止任務(Task)
    STOP_TASK(3);

    int id;

    NotifyCmd(int id) {
        this.id = id;
    }

    public int getId() {
        return id;
    }

    public static NotifyCmd valueOf(int id) {
        switch (id) {
            case 1:
                return START_TASK;
            case 2:
                return EDIT_TASK;
            case 3:
                return STOP_TASK;
            default:
                return NO_NOTIFY;
        }
    }

}
package com.rdpaas.task.handles;

import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.utils.SpringContextUtil;

/**
 * @author: rongdi
 * @date:
 */
public interface NotifyHandler<T> {

    static NotifyHandler chooseHandler(NotifyCmd notifyCmd) {
        return SpringContextUtil.getByTypeAndName(NotifyHandler.class,notifyCmd.toString());
    }

    public void update(T t);

}
package com.rdpaas.task.handles;

import com.rdpaas.task.scheduler.TaskExecutor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author: rongdi
 * @date:
 */
@Component("STOP_TASK")
public class StopTaskHandler implements NotifyHandler<Long> {

    @Autowired
    private TaskExecutor taskExecutor;

    @Override
    public void update(Long taskId) {
        taskExecutor.stop(taskId);
    }

}
class HeartBeat implements Runnable {
        @Override
        public void run() {
            for(;;) {
                try {
                    /**
                     * 時間到了就可以從延時隊列拿出節點對象,然後更新時間和序號,
                     * 最後再新建一個超時時間為心跳時間的節點對象放入延時隊列,形成循環的心跳
                     */
                    DelayItem<Node> item = heartBeatQueue.take();
                    if(item != null && item.getItem() != null) {
                        Node node = item.getItem();
                        handHeartBeat(node);
                    }
                    heartBeatQueue.offer(new DelayItem<>(config.getHeartBeatSeconds() * 1000,new Node(config.getNodeId())));
                } catch (Exception e) {
                    logger.error("task heart beat error,cause by:{} ",e);
                }
            }
        }
    }

    /**
     * 處理節點心跳
     * @param node
     */
    private void handHeartBeat(Node node) {
        if(node == null) {
            return;
        }
        /**
         * 先看看數據庫是否存在這個節點
         * 如果不存在:先查找下一個序號,然後設置到node對象中,最後插入
         * 如果存在:直接根據nodeId更新當前節點的序號和時間
         */
        Node currNode= nodeRepository.getByNodeId(node.getNodeId());
        if(currNode == null) {
            node.setRownum(nodeRepository.getNextRownum());
            nodeRepository.insert(node);
        } else  {
            nodeRepository.updateHeartBeat(node.getNodeId());
            NotifyCmd cmd = currNode.getNotifyCmd();
            String notifyValue = currNode.getNotifyValue();
            if(cmd != null && cmd != NotifyCmd.NO_NOTIFY) {
                /**
                 * 藉助心跳做一下通知的事情,比如及時停止正在執行的任務
                 * 根據指令名稱查找Handler
                 */
                NotifyHandler handler = NotifyHandler.chooseHandler(currNode.getNotifyCmd());
                if(handler == null || StringUtils.isEmpty(notifyValue)) {
                    return;
                }
                /**
                 * 執行操作
                 */
                handler.update(Long.valueOf(notifyValue));
            }
            
        }


    }

  最終的任務調度代碼如下:

package com.rdpaas.task.scheduler;

import com.rdpaas.task.common.Invocation;
import com.rdpaas.task.common.Node;
import com.rdpaas.task.common.NotifyCmd;
import com.rdpaas.task.common.Task;
import com.rdpaas.task.common.TaskDetail;
import com.rdpaas.task.common.TaskStatus;
import com.rdpaas.task.config.EasyJobConfig;
import com.rdpaas.task.repository.NodeRepository;
import com.rdpaas.task.repository.TaskRepository;
import com.rdpaas.task.strategy.Strategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * 任務調度器
 * @author rongdi
 * @date 2019-03-13 21:15
 */
@Component
public class TaskExecutor {

    private static final Logger logger = LoggerFactory.getLogger(TaskExecutor.class);

    @Autowired
    private TaskRepository taskRepository;

    @Autowired
    private NodeRepository nodeRepository;

    @Autowired
    private EasyJobConfig config;

    /**
     * 創建任務到期延時隊列
      */
    private DelayQueue<DelayItem<Task>> taskQueue = new DelayQueue<>();

    /**
     * 可以明確知道最多只會運行2個線程,直接使用系統自帶工具就可以了
     */
    private ExecutorService bossPool = Executors.newFixedThreadPool(2);

    /**
     * 正在執行的任務的Future
     */
    private Map<Long,Future> doingFutures = new HashMap<>();

    /**
     * 聲明工作線程池
     */
    private ThreadPoolExecutor workerPool;
    
    /**
     * 獲取任務的策略
     */
    private Strategy strategy;


    @PostConstruct
    public void init() {
        /**
         * 根據配置選擇一個節點獲取任務的策略
         */
        strategy = Strategy.choose(config.getNodeStrategy());
        /**
         * 自定義線程池,初始線程數量corePoolSize,線程池等待隊列大小queueSize,當初始線程都有任務,並且等待隊列滿后
         * 線程數量會自動擴充最大線程數maxSize,當新擴充的線程空閑60s后自動回收.自定義線程池是因為Executors那幾個線程工具
         * 各有各的弊端,不適合生產使用
         */
        workerPool = new ThreadPoolExecutor(config.getCorePoolSize(), config.getMaxPoolSize(), 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(config.getQueueSize()));
        /**
         * 執行待處理任務加載線程
         */
        bossPool.execute(new Loader());
        /**
         * 執行任務調度線程
         */
        bossPool.execute(new Boss());
    
    }

    class Loader implements Runnable {

        @Override
        public void run() {
            for(;;) {
                try { 
                    /**
                     * 先獲取可用的節點列表
                     */
                    List<Node> nodes = nodeRepository.getEnableNodes(config.getHeartBeatSeconds() * 2);
                    if(nodes == null || nodes.isEmpty()) {
                        continue;
                    }
                    /**
                     * 查找還有指定時間(單位秒)才開始的主任務列表
                     */
                    List<Task> tasks = taskRepository.listNotStartedTasks(config.getFetchDuration());
                    if(tasks == null || tasks.isEmpty()) {
                        continue;
                    }
                    for(Task task:tasks) {
                        
                        boolean accept = strategy.accept(nodes, task, config.getNodeId());
                        /**
                         * 不該自己拿就不要搶
                         */
                        if(!accept) {
                            continue;
                        }
                        /**
                         * 先設置成待執行
                         */
                        task.setStatus(TaskStatus.PENDING);
                        task.setNodeId(config.getNodeId());
                        /**
                         * 使用樂觀鎖嘗試更新狀態,如果更新成功,其他節點就不會更新成功。如果其它節點也正在查詢未完成的
                         * 任務列表和當前這段時間有節點已經更新了這個任務,version必然和查出來時候的version不一樣了,這裏更新
                         * 必然會返回0了
                         */
                        int n = taskRepository.updateWithVersion(task);
                        Date nextStartTime = task.getNextStartTime();
                        if(n == 0 || nextStartTime == null) {
                            continue;
                        }
                        /**
                         * 封裝成延時對象放入延時隊列,這裏再查一次是因為上面樂觀鎖已經更新了版本,會導致後面結束任務更新不成功
                         */
                        task = taskRepository.get(task.getId());
                        DelayItem<Task> delayItem = new DelayItem<Task>(nextStartTime.getTime() - new Date().getTime(), task);
                        taskQueue.offer(delayItem);
                        
                    }
                    Thread.sleep(config.getFetchPeriod());
                } catch(Exception e) {
                    logger.error("fetch task list failed,cause by:{}", e);
                }
            }
        }
        
    }
    
    class Boss implements Runnable {
        @Override
        public void run() {
            for (;;) {
                try {
                     /**
                     * 時間到了就可以從延時隊列拿出任務對象,然後交給worker線程池去執行
                     */
                    DelayItem<Task> item = taskQueue.take();
                    if(item != null && item.getItem() != null) {
                        Task task = item.getItem();
                        /**
                         * 真正開始執行了設置成執行中
                         */
                        task.setStatus(TaskStatus.DOING);
                        /**
                         * loader線程中已經使用樂觀鎖控制了,這裏沒必要了
                         */
                        taskRepository.update(task);
                        /**
                         * 提交到線程池
                         */
                        Future future = workerPool.submit(new Worker(task));
                        /**
                         * 暫存在doingFutures
                         */
                        doingFutures.put(task.getId(),future);
                    }
                     
                } catch (Exception e) {
                    logger.error("fetch task failed,cause by:{}", e);
                }
            }
        }

    }

    class Worker implements Callable<String> {

        private Task task;

        public Worker(Task task) {
            this.task = task;
        }

        @Override
        public String call() {
            logger.info("Begin to execute task:{}",task.getId());
            TaskDetail detail = null;
            try {
                //開始任務
                detail = taskRepository.start(task);
                if(detail == null) return null;
                //執行任務
                task.getInvokor().invoke();
                //完成任務
                finish(task,detail);
                logger.info("finished execute task:{}",task.getId());
                /**
                 * 執行完后刪了
                 */
                doingFutures.remove(task.getId());
            } catch (Exception e) {
                logger.error("execute task:{} error,cause by:{}",task.getId(), e);
                try {
                    taskRepository.fail(task,detail,e.getCause().getMessage());
                } catch(Exception e1) {
                    logger.error("fail task:{} error,cause by:{}",task.getId(), e);
                }
            }
            return null;
        }

    }

    /**
     * 完成子任務,如果父任務失敗了,子任務不會執行
     * @param task
     * @param detail
     * @throws Exception
     */
    private void finish(Task task,TaskDetail detail) throws Exception {

        //查看是否有子類任務
        List<Task> childTasks = taskRepository.getChilds(task.getId());
        if(childTasks == null || childTasks.isEmpty()) {
            //當沒有子任務時完成父任務
            taskRepository.finish(task,detail);
            return;
        } else {
            for (Task childTask : childTasks) {
                //開始任務
                TaskDetail childDetail = null;
                try {
                    //將子任務狀態改成執行中
                    childTask.setStatus(TaskStatus.DOING);
                    childTask.setNodeId(config.getNodeId());
                    //開始子任務
                    childDetail = taskRepository.startChild(childTask,detail);
                    //使用樂觀鎖更新下狀態,不然這裏可能和恢複線程產生併發問題
                    int n = taskRepository.updateWithVersion(childTask);
                    if (n > 0) {
                        //再從數據庫取一下,避免上面update修改后version不同步
                        childTask = taskRepository.get(childTask.getId());
                        //執行子任務
                        childTask.getInvokor().invoke();
                        //完成子任務
                        finish(childTask, childDetail);
                    }
                } catch (Exception e) {
                    logger.error("execute child task error,cause by:{}", e);
                    try {
                        taskRepository.fail(childTask, childDetail, e.getCause().getMessage());
                    } catch (Exception e1) {
                        logger.error("fail child task error,cause by:{}", e);
                    }
                }
            }
            /**
             * 當有子任務時完成子任務后再完成父任務
             */
            taskRepository.finish(task,detail);

        }

    }

    /**
     * 添加任務
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addTask(String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        return taskRepository.insert(task);
    }

    /**
     * 添加子任務
     * @param pid
     * @param name
     * @param cronExp
     * @param invockor
     * @return
     * @throws Exception
     */
    public long addChildTask(Long pid,String name, String cronExp, Invocation invockor) throws Exception {
        Task task = new Task(name,cronExp,invockor);
        task.setPid(pid);
        return taskRepository.insert(task);
    }

    /**
     * 立即執行任務,就是設置一下延時為0加入任務隊列就好了,這個可以外部直接調用
     * @param taskId
     * @return
     */
    public boolean startNow(Long taskId) {
        Task task = taskRepository.get(taskId);
        task.setStatus(TaskStatus.DOING);
        taskRepository.update(task);
        DelayItem<Task> delayItem = new DelayItem<Task>(0L, task);
        return taskQueue.offer(delayItem);
    }

    /**
     * 立即停止正在執行的任務,留給外部調用的方法
     * @param taskId
     * @return
     */
    public boolean stopNow(Long taskId) {
        Task task = taskRepository.get(taskId);
        if(task == null) {
            return false;
        }
        /**
         * 該任務不是正在執行,直接修改task狀態為已完成即可
         */
        if(task.getStatus() != TaskStatus.DOING) {
            task.setStatus(TaskStatus.STOP);
            taskRepository.update(task);
            return true;
        }
        /**
         * 該任務正在執行,使用節點配合心跳發布停用通知
         */
        int n = nodeRepository.updateNotifyInfo(NotifyCmd.STOP_TASK,String.valueOf(taskId));
        return n > 0;
    }

    /**
     * 立即停止正在執行的任務,這個不需要自己調用,是給心跳線程調用
     * @param taskId
     * @return
     */
    public boolean stop(Long taskId) {
        Task task = taskRepository.get(taskId);
        /**
         * 不是自己節點的任務,本節點不能執行停用
         */
        if(task == null || !config.getNodeId().equals(task.getNodeId())) {
            return false;
        }
        /**
         * 拿到正在執行任務的future,然後強制停用,並刪除doingFutures的任務
         */
        Future future = doingFutures.get(taskId);
        boolean flag =  future.cancel(true);
        if(flag) {
            doingFutures.remove(taskId);
            /**
             * 修改狀態為已停用
             */
            task.setStatus(TaskStatus.STOP);
            taskRepository.update(task);
        }
        /**
         * 重置通知信息,避免重複執行停用通知
         */
        nodeRepository.resetNotifyInfo(NotifyCmd.STOP_TASK);
        return flag;
    }
}

  好吧,其實實現很簡單,關鍵在於思路,不BB了,詳細代碼見: 在下告辭!

  

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

3c收購,鏡頭 收購有可能以全新價回收嗎?

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

賣IPHONE,iPhone回收,舊換新!教你怎麼賣才划算?

[apue] 神奇的 Solaris pipe

說到 pipe 大家可能都不陌生,經典的pipe調用配合fork進行父子進程通訊,簡直就是Unix程序的標配。

然而Solaris上的pipe卻和Solaris一樣是個奇葩(雖然Solaris前途黯淡,但是不妨礙我們從它裏面挖掘一些有價值的東西),

有着和一般pipe諸多的不同之處,本文就來說說Solaris上神奇的pipe和一般pipe之間的異同。

 

1.solaris pipe 是全雙工的

一般系統上的pipe調用是半雙工的,只能單向傳遞數據,如果需要雙向通訊,我們一般是建兩個pipe分別讀寫。像下面這樣:

 1     int n, fd1[2], fd2[2]; 
 2     if (pipe (fd1) < 0 || pipe(fd2) < 0)
 3         err_sys ("pipe error"); 
 4 
 5     char line[MAXLINE]; 
 6     pid_t pid = fork (); 
 7     if (pid < 0) 
 8         err_sys ("fork error"); 
 9     else if (pid > 0)
10     {
11         close (fd1[0]);  // write on pipe1 as stdin for co-process
12         close (fd2[1]);  // read on pipe2 as stdout for co-process
13         while (fgets (line, MAXLINE, stdin) != NULL) { 
14             n = strlen (line); 
15             if (write (fd1[1], line, n) != n)
16                 err_sys ("write error to pipe"); 
17             if ((n = read (fd2[0], line, MAXLINE)) < 0)
18                 err_sys ("read error from pipe"); 
19 
20             if (n == 0) { 
21                 err_msg ("child closed pipe"); 
22                 break;
23             }
24             line[n] = 0; 
25             if (fputs (line, stdout) == EOF)
26                 err_sys ("fputs error"); 
27         }
28 
29         if (ferror (stdin))
30             err_sys ("fputs error"); 
31 
32         return 0; 
33     }
34     else { 
35         close (fd1[1]); 
36         close (fd2[0]); 
37         if (fd1[0] != STDIN_FILENO) { 
38             if (dup2 (fd1[0], STDIN_FILENO) != STDIN_FILENO)
39                 err_sys ("dup2 error to stdin"); 
40             close (fd1[0]); 
41         }
42 
43         if (fd2[1] != STDOUT_FILENO) { 
44             if (dup2 (fd2[1], STDOUT_FILENO) != STDOUT_FILENO)
45                 err_sys ("dup2 error to stdout"); 
46             close (fd2[1]); 
47         }
48 
49         if (execl (argv[1], "add2", (char *)0) < 0)
50             err_sys ("execl error"); 
51     }

這個程序創建兩個管道,fd1用來寫請求,fd2用來讀應答;對子進程而言,fd1重定向到標準輸入,fd2重定向到標準輸出,讀取stdin中的數據相加然後寫入stdout完成工作。父進程在取得應答後向標準輸出寫入結果。

如果在Solaris上,可以直接用一個pipe同時讀寫,代碼可以重寫成這樣:

 1 int fd[2];
 2 if (pipe(fd) < 0) 
 3     err_sys("pipe error\n");
 4 
 5 char line[MAXLINE];
 6 pid_t pid = fork();
 7 if (pid < 0)
 8     err_sys("fork error\n");
 9 else if (pid > 0)
10 {
11     close(fd[1]);
12     while (fgets(line, MAXLINE, stdin) != NULL) {
13         n = strlen(line);
14         if (write(fd[0], line, n) != n)
15             err_sys("write error to pipe\n")
16         if ((n = read(fd[0], line, MAXLINE)) < 0) 
17             err_sys("read error from pipe\n");
18 
19         if (n == 0) 
20             err_sys("child closed pipe\n");
21         line[n] = 0;
22         if (fputs(line, stdout) == EOF) 
23             err_sys("fputs error\n");
24     }
25 
26     if (ferror(stdin))
27         err_sys("fputs error\n");
28 
29     return 0;
30 }
31 else {
32     close(fd[0]);
33     if (fd[1] != STDIN_FILENO)
34         if (dup2(fd[1], STDIN_FILENO) != STDIN_FILENO)
35             err_sys("dup2 error to stdin\n");
36 
37     if (fd[1] != STDOUT_FILENO) {
38         if (dup2(fd[1], STDOUT_FILENO) != STDOUT_FILENO)
39             err_sys("dup2 error to stdout\n");
40         close(fd[1]);
41     }
42 
43     if (execl(argv[1], argv[2], (char *)0) < 0)
44         err_sys("execl error\n");
45 
46 }

代碼清爽多了,不用去考慮fd1[0]和fd2[1]是啥意思是一件很養腦的事。

不過這樣的代碼只能在Solaris上運行(聽說BSD也支持?),如果考慮到可移植性,還是寫上面的比較穩妥。

 

測試程序

 

 

2. solaris pipe 可以脫離父子關係建立

pipe 好用但是沒法脫離fork使用,一般的pipe如果想讓任意兩個進程通訊,得藉助它的變身fifo來實現。

關於FIFO,詳情可參考我之前寫的一篇文章:

 

而Solaris上的pipe沒這麼多事,加入兩個調用:fattach / fdetach,你就可以像使用FIFO一樣使用pipe了:

 1 int fd[2];
 2 if (pipe(fd) < 0)
 3     err_sys("pipe error\n");
 4 
 5 if (fattach(fd[1], "./pipe") < 0)
 6     err_sys("fattach error\n");
 7 
 8 printf("attach to file pipe ok\n");
 9 
10 close(fd[1]);
11 char line[MAXLINE];
12 while (fgets(line, MAXLINE, stdin) != NULL) {
13     n = strlen(line);
14     if (write(fd[0], line, n) != n)
15         err_sys("write error to pipe\n");
16     if ((n = read(fd[0], line, MAXLINE)) < 0)
17         err_sys("read error from pipe\n");
18 
19     if (n == 0) 
20         err_sys("child closed pipe\n");
21 
22     line[n] = 0;
23     if (fputs(line, stdout) == EOF)
24         err_sys("fputs error\n");
25 }
26 
27 if (ferror(stdin))
28     err_sys("fputs error\n");
29 
30 if (fdetach("./pipe") < 0)
31     err_sys("fdetach error\n");
32 
33 printf("detach from file pipe ok\n");

在pipe調用之後立即加入fattach調用,可以將管道關聯到文件系統的一個文件名上,該文件必需事先存在,且可讀可寫。

在fattach調用之前這個文件(./pipe)是個普通文件,打開讀寫都是磁盤IO;

在fattach調用之後,這個文件就變身成為一個管道了,打開讀寫都是內存流操作,且管道的另一端就是attach的那個進程。

子進程也需要改造一下,以便使用pipe通訊:

 1 int fd, n, int1, int2;
 2 char line[MAXLINE];
 3 fd = open("./pipe", O_RDWR);
 4 if (fd < 0)
 5     err_sys("open file pipe failed\n");
 6 
 7 printf("open file pipe ok, fd = %d\n", fd);
 8 while ((n = read(fd, line, MAXLINE)) > 0) {
 9     line[n] = 0;
10     if (sscanf(line, "%d%d", &int1, &int2) == 2) {
11         sprintf(line, "%d\n", int1 + int2);
12         n = strlen(line);
13         if (write(fd, line, n) != n)
14             err_sys("write error\n");
15 
16         printf("i am working on %s\n", line);
17     }
18     else {
19         if (write(fd, "invalid args\n", 13) != 13)
20             err_sys("write msg error\n");
21     }
22 }
23 
24 close(fd);

打開pipe就如同打開普通文件一樣,open直接搞定。當然前提是attach進程必需已經在運行。

當attach進程detach后,管道文件又將恢復它的本來面目。

 

脫離了父子關係的pipe其實可以建立多對一關係(多對多關係不可以,因為只能有一個進程attach)。

例如開4個cmd窗口,分別執行以下命令:

./padd2 abc
./add2
./add2
./add2

 向attach進程(padd2)發送9個計算請求后,可以看到輸出結果如下:

-bash-3.2$ ./padd2 abc
attach to file pipe ok
1 1
2
2 2
4
3 3 
6
4 4
8
5 5
10
6 6 
12
7 7 
14
8 8
16
9 9
18

 再回來看各個open管道的進程,輸出分別如下:

-bash-3.2$ ./add2
open file pipe ok, fd = 3
source: 1 1
i am working on 2
source: 4 4
i am working on 8
source: 7 7 
i am working on 14 

 

-bash-3.2$ ./add2
open file pipe ok, fd = 3
source: 2 2
i am working on 4
source: 5 5
i am working on 10
source: 9 9
i am working on 18 

 

-bash-3.2$ ./add2
open file pipe ok, fd = 3
source: 2 2
i am working on 4
source: 5 5
i am working on 10
source: 9 9
i am working on 18 

 

-bash-3.2$ ./add2
open file pipe ok, fd = 3
source: 3 3
i am working on 6
source: 6 6
i am working on 12
source: 8 8 
i am working on 16

 

可以發現一個很有趣的現象,就是各個add2進程基本是輪着來獲取請求的,可以猜想底層的pipe可能有一個進程排隊機制。

但是反過來使用pipe就不行了。就是說當啟動一個add3(區別於上例的add2與padd2)作為fattach端打開pipe,啟動多個padd3作為open端使用pipe,

然後通過命令行給padd3傳遞要相加的值,可以寫一個腳本同時啟動多個padd3,來查看效果:

#! /bin/sh
./padd3 1 1 &
./padd3 2 2 &
./padd3 3 3 &
./padd3 4 4 &

 這個腳本中啟動了4個加法進程,同時向add3發送4個加法請求,腳本中四個進程輸出如下:

-bash-3.2$ ./padd3.sh
-bash-3.2$ open file pipe ok, fd = 3
1 1 = 2
open file pipe ok, fd = 3
2 2 = 4
open file pipe ok, fd = 3
open file pipe ok, fd = 3
4 4 = 37

 可以看到3+3的請求被忽略了,轉到add3查看輸出:

-bash-3.2$ ./add3
attach to file pipe ok
source: 1 1
i am working on 1 + 1 = 2
source: 2 2
i am working on 2 + 2 = 4
source: 3 34 4
i am working on 3 + 34 = 37

 原來是3+3與4+4兩個請求粘連了,導致add3識別成一個3+34的請求,所以出錯了。

多運行幾遍腳本后,發現還有這樣的輸出:

-bash-3.2$ ./padd3.sh
-bash-3.2$ open file pipe ok, fd = 3
4 4 = 2
open file pipe ok, fd = 3
2 2 = 4
open file pipe ok, fd = 3
3 3 = 6
open file pipe ok, fd = 3
1 1 = 8

  4+4=2?1+1=8?再看add3這頭的輸出:

-bash-3.2$ ./add3
attach to file pipe ok
source: 1 1
i am working on 1 + 1 = 2
source: 2 2
i am working on 2 + 2 = 4
source: 3 3
i am working on 3 + 3 = 6
source: 4 4
i am working on 4 + 4 = 8

 完全正常呢。

經過一番推理,發現是4+4的請求取得了1+1請求的應答;1+1的請求取得了4+4的應答。

可見這樣的結構還有一個弊端,同時請求的進程可能無法得到自己的應答,應答與請求之間相互錯位。

所以想用fattach來實現多路請求的人還是洗洗睡吧,畢竟它就是一個pipe不是,還能給它整成tcp么?

而之前的例子可以,是因為請求是順序發送的,上個請求得到應答后才發送下個請求,所以不存在這個例子的問題(但是實用性也不高)。

 

測試程序

 

 

3. solaris pipe 可以通過connld模塊實現類似tcp的多路連接

第2條剛說不能實現多路連接,第3條就接着來打臉了,這是由於Solaris上的pipe都是基於STREAMS技術構建,

而STREAMS是支持靈活的PUSH、POP流處理模塊的,再加上STREAMS傳遞文件fd的能力,就可以支持類似tcp中accept的能力。

即每個open pipe文件的進程,得到的不是原來管道的fd,而是新創建管道的fd,而管道的另一側fd則通過已有的管道發送到attach進程,

後者使用這個新的fd與客戶進程通訊。為了支持多路連接,我們的代碼需要重新整理一下,首先看客戶端:

1 int fd;
2 char line[MAXLINE];
3 fd = cli_conn("./pipe");
4 if (fd < 0)
5     return 0;

這裏將open相關邏輯封裝到了cli_conn函數中,以便之後復用:

 1 int cli_conn(const char *name)
 2 {
 3     int fd;
 4     if ((fd = open(name, O_RDWR)) < 0) {
 5         printf("open pipe file failed\n");
 6         return -1;
 7     }
 8 
 9     if (isastream(fd) == 0) {
10         close(fd);
11         return -2;
12     }
13 
14     return fd;
15 }

可以看到與之前幾乎沒有變化,只是增加了isastream調用防止attach進程沒有啟動。

再來看下服務端:

 1 int listenfd = serv_listen("./pipe");
 2 if (listenfd < 0)
 3     return 0;
 4 
 5 int acceptfd = 0;
 6 int n = 0, int1 = 0, int2 = 0;
 7 char line[MAXLINE];
 8 uid_t uid = 0;
 9 while ((acceptfd = serv_accept(listenfd, &uid)) >= 0)
10 {
11     printf("accept a client, fd = %d, uid = %ld\n", acceptfd, uid);
12     while ((n = read(acceptfd, line, MAXLINE)) > 0) {
13         line[n] = 0;
14         printf("source: %s\n", line);
15         if (sscanf(line, "%d%d", &int1, &int2) == 2) {
16             sprintf(line, "%d\n", int1 + int2);
17             n = strlen(line);
18             if (write(acceptfd, line, n) != n) {
19                 printf("write error\n");
20                 return 0;
21             }
22             printf("i am working on %d + %d = %s\n", int1, int2, line);
23         }
24         else {
25             if (write(acceptfd, "invalid args\n", 13) != 13) {
26                 printf("write msg error\n");
27                 return 0;
28             }
29         }
30     }
31 
32     close(acceptfd);
33 }
34 
35 if (fdetach("./pipe") < 0) {
36     printf("fdetach error\n");
37     return 0;
38 }
39 
40 printf("detach from file pipe ok\n");
41 close(listenfd);

首先調用serv_listen建立基本pipe,然後不斷在該pipe上調用serv_accept來獲取獨立的客戶端連接。之後的邏輯與以前一樣。

現在重點看下封裝的這兩個方法:

 1 int serv_listen(const char *name)
 2 {
 3     int tempfd;
 4     int fd[2];
 5     unlink(name);
 6     tempfd = creat(name, FIFO_MODE);
 7     if (tempfd < 0) {
 8         printf("creat failed\n");
 9         return -1;
10     }
11 
12     if (close(tempfd) < 0) {
13         printf("close temp fd failed\n");
14         return -2;
15     }
16 
17     if (pipe(fd) < 0) {
18         printf("pipe error\n");
19         return -3;
20     }
21 
22     if (ioctl(fd[1], I_PUSH, "connld") < 0) {
23         printf("I_PUSH connld failed\n");
24         close(fd[0]);
25         close(fd[1]);
26         return -4;
27     }
28 
29     printf("push connld ok\n");
30     if (fattach(fd[1], name) < 0) {
31         printf("fattach error\n");
32         close(fd[0]);
33         close(fd[1]);
34         return -5;
35     }
36 
37     printf("attach to file pipe ok\n");
38     close(fd[1]);
39     return fd[0];
40 }

serv_listen封裝了與建立基本pipe相關的代碼,首先確保pipe文件存在且可讀寫,然後創建普通的pipe,在fattach調用之前必需先PUSH一個connld模塊到該pipe STREAM中。這樣就大功告成!

 1 int serv_accept(int listenfd, uid_t *uidptr)
 2 {
 3     struct strrecvfd recvfd;
 4     if (ioctl(listenfd, I_RECVFD, &recvfd) < 0) {
 5         printf("I_RECVFD from listen fd failed\n");
 6         return -1;
 7     }
 8 
 9     if (uidptr)
10         *uidptr = recvfd.uid;
11 
12     return recvfd.fd;
13 }

當有客戶端連接上來的時候,使用I_RECVFD接收connld返回的另一個pipe的fd。之後的數據將在該pipe進行。

看了看,感覺和tcp的listen與accept別無二致,看來天下武功,至精深處都是英雄所見略同。

之前的多個客戶端同時運行的例子再跑一遍,觀察attach端輸出:

-bash-3.2$ ./add4
push connld ok
attach to file pipe ok
accept a client, fd = 4, uid = 101
source: 1 1
i am working on 1 + 1 = 2
accept a client, fd = 4, uid = 101
source: 2 2
i am working on 2 + 2 = 4
accept a client, fd = 4, uid = 101
source: 3 3
i am working on 3 + 3 = 6
accept a client, fd = 4, uid = 101
source: 4 4
i am working on 4 + 4 = 8

 一切正常。再看下腳本中四個進程的輸出:

-bash-3.2$ ./padd4.sh
-bash-3.2$ open file pipe ok, fd = 3
1 1 = 2
open file pipe ok, fd = 3
2 2 = 4
open file pipe ok, fd = 3
3 3 = 6
open file pipe ok, fd = 3
4 4 = 8

 也是沒問題的,既沒有出現多個請求粘連的情況,也沒有出現請求與應答錯位的情況。

 

測試程序

 

 

4.結論

Solaris 上的pipe不僅可以全雙工通訊、不依賴父子進程關係,還可以實現類似tcp那樣分離多個客戶端通訊連接的能力。

雖然Solaris前途未卜,但是希望一些好的東西還是能流傳下來,就比如這個神奇的pipe。

 

看完今天的文章,你是不是對特立獨行的Solaris又加深了一層了解?歡迎留言區說說你認識的Solaris。

 

 

 

 

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

收購3c,收購IPHONE,收購蘋果電腦-詳細收購流程一覽表

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,網站設計公司幫您達到更多曝光效益

※公開收購3c價格,不怕被賤賣!

※自行創業 缺乏曝光? 下一步"網站設計"幫您第一時間規劃公司的門面形象

嵌入式、C語言位操作的一些技巧匯總

下面分享關於位操作的一些筆記:

一、位操作簡單介紹

首先,以下是按位運算符:

嵌入式編程中,常常需要對一些寄存器進行配置,有的情況下需要改變一個字節中的某一位或者幾位,但是又不想改變其它位原有的值,這時就可以使用按位運算符進行操作。下面進行舉例說明,假如有一個8位的TEST寄存器:

當我們要設置第0位bit0的值為1時,可能會這樣進行設置:

TEST = 0x01;

但是,這樣設置是不夠準確的,因為這時候已經同時操作到了高7位:bit1~bit7,如果這高7位沒有用到的話,這麼設置沒有什麼影響;但是,如果這7位正在被使用,結果就不是我們想要的了。

在這種情況下,我們就可以借用按位操作運算符進行配置。

對於二進制位操作來說,不管該位原來的值是0還是1,它跟0進行&運算,得到的結果都是0,而跟1進行&運算,將保持原來的值不變;不管該位原來的值是0還是1,它跟1進行|運算,得到的結果都是1,而跟0進行|運算,將保持原來的值不變。

所以,此時可以設置為:

TEST = TEST | 0x01;

其意義為:TEST寄存器的高7位均不變,最低位變成1了。在實際編程中,常改寫為:

TEST |= 0x01;

這種寫法可以一定程度上簡化代碼,是 C 語言常用的一種編程風格。設置寄存器的某一位還有另一種操作方法,以上的等價方法如:

TEST |= (0x01 << 0);

第幾位要置1就左移幾位。

同樣的,要給TEST的低4位清0,高4位保持不變,可以進行如下配置:

TEST &= 0xF0;

二、嵌入式中位操作一些常見用法

1、一個32bit數據的位、字節讀取操作

(1)獲取單字節:

#define GET_LOW_BYTE0(x)    ((x >>  0) & 0x000000ff)    /* 獲取第0個字節 */
#define GET_LOW_BYTE1(x)    ((x >>  8) & 0x000000ff)    /* 獲取第1個字節 */
#define GET_LOW_BYTE2(x)    ((x >> 16) & 0x000000ff)    /* 獲取第2個字節 */
#define GET_LOW_BYTE3(x)    ((x >> 24) & 0x000000ff)    /* 獲取第3個字節 */

示例:

(2)獲取某一位:

#define GET_BIT(x, bit) ((x & (1 << bit)) >> bit)   /* 獲取第bit位 */

示例:

2、一個32bit數據的位、字節清零操作

(1)清零某個字節:

#define CLEAR_LOW_BYTE0(x)  (x &= 0xffffff00)   /* 清零第0個字節 */
#define CLEAR_LOW_BYTE1(x)  (x &= 0xffff00ff)   /* 清零第1個字節 */
#define CLEAR_LOW_BYTE2(x)  (x &= 0xff00ffff)   /* 清零第2個字節 */
#define CLEAR_LOW_BYTE3(x)  (x &= 0x00ffffff)   /* 清零第3個字節 */

示例:

(2)清零某一位:

#define CLEAR_BIT(x, bit)   (x &= ~(1 << bit))  /* 清零第bit位 */

示例:

3、一個32bit數據的位、字節置1操作

(1)置某個字節為1:

#define SET_LOW_BYTE0(x)    (x |= 0x000000ff)   /* 第0個字節置1 */   
#define SET_LOW_BYTE1(x)    (x |= 0x0000ff00)   /* 第1個字節置1 */   
#define SET_LOW_BYTE2(x)    (x |= 0x00ff0000)   /* 第2個字節置1 */   
#define SET_LOW_BYTE3(x)    (x |= 0xff000000)   /* 第3個字節置1 */

示例:

(2)置位某一位:

#define SET_BIT(x, bit) (x |= (1 << bit))   /* 置位第bit位 */

4、判斷某一位或某幾位連續位的值

(1)判斷某一位的值

舉例說明:判斷0x68第3位的值。

也就是說,要判斷第幾位的值,if里就左移幾位(當然別過頭了)。在嵌入式編程中,可通過這樣的方式來判斷寄存器的狀態位是否被置位。

(2)判斷某幾位連續位的值

/* 獲取第[n:m]位的值 */
#define BIT_M_TO_N(x, m, n)  ((unsigned int)(x << (31-(n))) >> ((31 - (n)) + (m)))

示例:

這是一個查詢連續狀態位的例子,因為有些情況不止有0、1兩種狀態,可能會有多種狀態,這種情況下就可以用這種方法來取出狀態位,再去執行相應操作。

以上是對32bit數據的一些操作進行總結,其它位數的數據類似,可根據需要進行修改。

三、STM32寄存器配置

STM32有幾套固件庫,這些固件庫函數以函數的形式進行1層或者多層封裝(軟件開發中很重要的思想之一:分層思想),但是到了最裏面的一層就是對寄存器的配置。我們平時都比較喜歡固件庫來開發,大概是因為固件庫用起來比較簡單,用固件庫寫出來的代碼比較容易閱讀。最近一段時間一直在配置寄存器,越發地發現使用寄存器來進行一些外設的配置也是很容易懂的。使用寄存器的方式編程無非就是往寄存器的某些位置1、清零以及對寄存器一些狀態位進行判斷、讀取寄存器的內容等。

這些基本操作在上面的例子中已經有介紹,我們依舊以實例來鞏固上面的知識點(以STM32F1xx為例):

(1)寄存器配置

看一下GPIO功能的端口輸出數據寄存器 (GPIOx_ODR) (x=A..E) :

假設我們要讓PA10引腳輸出高、輸出低,可以這麼做:

方法一:

GPIOA->ODR |= 1 << 10;      /* PA10輸出高(置1操作) */
GPIOA->ODR &= ~(1 << 10);  /* PA10輸出低(清0操作) */

也可用我們上面的置位、清零的宏定義:

SET_BIT(GPIOA->ODR, 10);    /* PA10輸出高(置1操作) */
CLEAR_BIT(GPIOA->ODR, 10);  /* PA10輸出低(清0操作) */

方法二:

GPIOA->ODR |= (uint16_t)0x0400;   /* PA10輸出高(置1操作) */
GPIOA->ODR &= ~(uint16_t)0x0400;  /* PA10輸出低(清0操作) */

貌似第二種方法更麻煩?還得去細心地去構造一個數據。

但是,其實第二種方法其實是ST推薦我們用的方法,為什麼這麼說呢?因為ST官方已經把這些我們要用到的值給我們配好了,在stm32f10x.h中:

這個頭文件中存放的就是外設寄存器的一些位配置。

所以我們的方法二等價於:

GPIOA->ODR |= GPIO_ODR_ODR10;   /* PA10輸出高(置1操作) */
GPIOA->ODR &= ~GPIO_ODR_ODR10;  /* PA10輸出低(清0操作) */

兩種方法都是很好的方法,但方法一似乎更好理解。

配置連續幾位的方法也是一樣的,就不介紹了。簡單介紹配置不連續位的方法,以TIM1的CR1寄存器為例:

設置CEN位為1、設置CMS[1:0]位為01、設置CKD[1:0]位為10:

TIM1->CR1 |= (0x1 << 1)| (0x1 << 5) |(0x2 << 8);

這是組合的寫法。當然,像上面一樣拆開來寫也是可以的。

(2)判斷標誌位

以狀態寄存器(USART_SR) 為例:

判斷RXNE是否被置位:

/* 數據寄存器非空,RXNE標誌置位 */
if (USART1->SR & (1 << 5))
{
    /* 其它代碼 */
    
    USART1->SR &= ~(1 << 5);  /* 清零RXNE標誌 */
}

或者:

/* 數據寄存器非空,RXNE標誌置位 */
if (USART1->SR & USART_SR_RXNE)
{
    /* 其它代碼 */
    
    USART1->SR &= ~USART_SR_RXNE;  /* 清零RXNE標誌 */
}

四、總結

以上就是本次關於位操作的一點總結筆記,有必要掌握。雖然說在用STM32的時候有庫函數可以用,但是最接近芯片內部原理的還是寄存器。有可能之後有用到其它芯片沒有像ST這樣把寄存器相關配置封裝得那麼好,那就不得不直接操控寄存器了。

此外,使用庫函數的方式代碼佔用空間大,用寄存器的話,代碼佔用空間小。之前有個需求,我們能用的Flash的空間大小隻有4KB,遇到類似這樣的情況就不能那麼隨性的用庫函數了。

最後,應用的時候當然是怎麼簡單就怎麼用。學從“難”處學,用從易處用,與君共勉~

END:以上筆記中如有錯誤,歡迎指出!謝謝

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理【其他文章推薦】

3c收購,鏡頭 收購有可能以全新價回收嗎?

台北網頁設計公司這麼多,該如何挑選?? 網頁設計報價省錢懶人包"嚨底家"

網頁設計公司推薦更多不同的設計風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

賣IPHONE,iPhone回收,舊換新!教你怎麼賣才划算?