RocketMQ系列(七)事務消息(數據庫|最終一致性)

終於到了今天了,終於要講RocketMQ最牛X的功能了,那就是事務消息。為什麼事務消息被吹的比較熱呢?近幾年微服務大行其道,整個系統被切成了多個服務,每個服務掌管着一個數據庫。那麼多個數據庫之間的數據一致性就成了問題,雖然有像XA這種強一致性事務的支持,但是這種強一致性在互聯網的應用中並不適合,人們還是更傾向於使用最終一致性的解決方案,在最終一致性的解決方案中,使用MQ保證各個系統之間的數據一致性又是首選。

RocketMQ為我們提供了事務消息的功能,它使得我們投放消息和其他的一些操作保持一個整體的原子性。比如:向數據庫中插入數據,再向MQ中投放消息,把這兩個動作作為一個原子性的操作。貌似其他的MQ是沒有這種功能的。

但是,縱觀全網,講RocketMQ事務消息的博文中,幾乎沒有結合數據庫的,都是直接投放消息,然後講解事務消息的幾個狀態,雖然講的也沒毛病,但是和項目中事務最終一致性的落地方案還相距甚遠。包括我自己在內,在項目中,服務化以後,用MQ保證事務的最終一致性,在網上一搜,根本沒有落地的方案,都是侃侃而談。於是,我寫下這篇博文,結合數據庫,來談一談RocketMQ的事務消息到底怎麼用。

基礎概念

要使用RocketMQ的事務消息,要實現一個TransactionListener的接口,這個接口中有兩個方法,如下:

/**
     * When send transactional prepare(half) message succeed, this method will be invoked to execute local transaction.
     *
     * @param msg Half(prepare) message
     * @param arg Custom business parameter
     * @return Transaction state
     */
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);

/**
     * When no response to prepare(half) message. broker will send check message to check the transaction status, and this
     * method will be invoked to get local transaction status.
     *
     * @param msg Check message
     * @return Transaction state
     */
LocalTransactionState checkLocalTransaction(final MessageExt msg);

RocketMQ的事務消息是基於兩階段提交實現的,也就是說消息有兩個狀態,prepared和commited。當消息執行完send方法后,進入的prepared狀態,進入prepared狀態以後,就要執行executeLocalTransaction方法,這個方法的返回值有3個,也決定着這個消息的命運,

  • COMMIT_MESSAGE:提交消息,這個消息由prepared狀態進入到commited狀態,消費者可以消費這個消息;
  • ROLLBACK_MESSAGE:回滾,這個消息將被刪除,消費者不能消費這個消息;
  • UNKNOW:未知,這個狀態有點意思,如果返回這個狀態,這個消息既不提交,也不回滾,還是保持prepared狀態,而最終決定這個消息命運的,是checkLocalTransaction這個方法。

當executeLocalTransaction方法返回UNKNOW以後,RocketMQ會每隔一段時間調用一次checkLocalTransaction,這個方法的返回值決定着這個消息的最終歸宿。那麼checkLocalTransaction這個方法多長時間調用一次呢?我們在BrokerConfig類中可以找到,

 /**
  * Transaction message check interval.
  */
@ImportantField
private long transactionCheckInterval = 60 * 1000;

這個值是在brokder.conf中配置的,默認值是60*1000,也就是1分鐘。那麼會檢查多少次呢?如果每次都返回UNKNOW,也不能無休止的檢查吧,

/**
 * The maximum number of times the message was checked, if exceed this value, this message will be discarded.
 */
@ImportantField
private int transactionCheckMax = 5;

這個是檢查的最大次數,超過這個次數,如果還返回UNKNOW,這個消息將被刪除。

事務消息中,TransactionListener這個最核心的概念介紹完后,我們看看代碼如何寫吧。

落地案例

我們在數據庫中有一張表,具體如下:

CREATE TABLE `s_term` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `term_year` year(4) NOT NULL ,
  `type` int(1) NOT NULL DEFAULT '1' ,
  PRIMARY KEY (`id`)
) 

字段的具體含義大家不用管,一會我們將向這張表中插入一條數據,並且向MQ中投放消息,這兩個動作是一個原子性的操作,要麼全成功,要麼全失敗。

我們先來看看事務消息的客戶端的配置,如下:

@Bean(name = "transactionProducer",initMethod = "start",destroyMethod = "shutdown")
public TransactionMQProducer transactionProducer() {
    TransactionMQProducer producer = new
        TransactionMQProducer("TransactionMQProducer");
    producer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    producer.setTransactionListener(transactionListener());
    return producer;
}

@Bean
public TransactionListener transactionListener() {
    return new TransactionListenerImpl();
}

我們使用TransactionMQProducer生命生產者的客戶端,並且生產者組的名字叫做TransactionMQProducer,後面NameServer的地址沒有變化。最後就是設置了一個TransactionListener監聽器,這個監聽器的實現我們也定義了一個Bean,返回的是我們自定義的TransactionListenerImpl,我們看看裡邊怎麼寫的吧。

public class TransactionListenerImpl implements TransactionListener {
    @Autowired
    private TermMapper termMapper;

    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {

        Integer termId = (Integer)arg;
        Term term = termMapper.selectById(termId);
        System.out.println("executeLocalTransaction termId="+termId+" term:"+term);
        if (term != null) return COMMIT_MESSAGE;

        return LocalTransactionState.UNKNOW;
    }

	@Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String termId = msg.getKeys();
        Term term = termMapper.selectById(Integer.parseInt(termId));
        System.out.println("checkLocalTransaction termId="+termId+" term:"+term);
        if (term != null) {
            System.out.println("checkLocalTransaction:COMMIT_MESSAGE");
            return COMMIT_MESSAGE;
        }
        System.out.println("checkLocalTransaction:ROLLBACK_MESSAGE");
        return ROLLBACK_MESSAGE;
    }
}

在這個類中,我們要實現executeLocalTransaction和checkLocalTransaction兩個方法,其中executeLocalTransaction是在執行完send方法后立刻執行的,裡邊我們根據term表的id去查詢,如果能夠查詢出結果,就commit,消費端可以消費這個消息,如果查詢不到,就返回一個UNKNOW,說明過一會會調用checkLocalTransaction再次檢查。在checkLocalTransaction方法中,我們同樣用termId去查詢,這次如果再查詢不到就直接回滾了。

好了,事務消息中最重要的兩個方法都已經實現了,我們再來看看service怎麼寫吧,

@Autowired
private TermMapper termMapper;
@Autowired
@Qualifier("transactionProducer")
private TransactionMQProducer producer;

@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
    Term term = new Term();
    term.setTermYear(2020);
    term.setType(1);
    int insert = termMapper.insert(term);

    Message message = new Message();
    message.setTopic("cluster-topic");
    message.setKeys(term.getId()+"");
    message.setBody(new String("this is transaction mq "+new Date()).getBytes());

    TransactionSendResult sendResult = producer
        .sendMessageInTransaction(message, term.getId());
    System.out.println("sendResult:"+sendResult.getLocalTransactionState() 
                       +" 時間:"+new Date());
}
  • 在sendTransactionMQ方法上,我們使用了@Transactional註解,那麼在這個方法中,發生任何的異常,數據庫事務都會回滾;
  • 然後,我們創建Term對象,向數據庫中插入Term;
  • 構建Mesaage的信息,將termId作為message的key;
  • 使用sendMessageInTransaction發送消息,傳入message和termId,這兩個參數和executeLocalTransaction方法的入參是對應的。

最後,我們在test方法中,調用sendTransactionMQ方法,如下:

@Test
public void sendTransactionMQ() throws InterruptedException {
    try {
        transactionService.sendTransactionMQ();
    } catch (Exception e) {
        e.printStackTrace();
    }

    Thread.sleep(600000);
}

整個生產端的代碼就是這些了,消費端的代碼沒有什麼變化,就不給大家貼出來了。接下來,我們把消費端的應用啟動起來,消費端的應用最好不要包含生產端的代碼,因為TransactionListener實例化以後,就會進行監聽,而我們在消費者端是不希望看到TransactionListener中的日誌的。

我們運行一下生產端的代碼,看看是什麼情況,日誌如下:

executeLocalTransaction termId=15 term:com.example.rocketmqdemo.entity.Term@4a3509b0
sendResult:COMMIT_MESSAGE 時間:Wed Jun 17 08:56:49 CST 2020
  • 我們看到,先執行的是executeLocalTransaction這個方法,termId打印出來了,發送的結果也出來了,是COMMIT_MESSAGE,那麼消費端是可以消費這個消息的;
  • 注意一下兩個日誌的順序,先執行的executeLocalTransaction,說明在執行sendMessageInTransaction時,就會調用監聽器中的executeLocalTransaction,它的返回值決定着這個消息是否真正的投放到隊列中;

再看看消費端的日誌,

msgs.size():1
this is transaction mq Wed Jun 17 08:56:49 CST 2020

消息被正常消費,沒有問題。那麼數據庫中有沒有termId=15的數據呢?我們看看吧,

數據是有的,插入數據也是成功的。

這樣使用就真的正確的嗎?我們改一下代碼看看,在service方法中拋個異常,讓數據庫的事務回滾,看看是什麼效果。改動代碼如下:

@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
    ……
    throw new Exception("數據庫事務異常");
}

拋出異常后,數據庫的事務會回滾,那麼MQ呢?我們再發送一個消息看看,

生產端的日誌如下:

executeLocalTransaction termId=16 term:com.example.rocketmqdemo.entity.Term@5d6b5d3d
sendResult:COMMIT_MESSAGE 時間:Wed Jun 17 09:07:15 CST 2020

java.lang.Exception: 數據庫事務異常
  • 從日誌中,我們可以看到,消息是投放成功的,termId=16,事務的返回狀態是COMMIT_MESSAGE;
  • 最後拋出了我們定義的異常,那麼數據庫中應該是不存在這條消息的啊;

我們先看看數據庫吧,

數據庫中並沒有termId=16的數據,那麼數據庫的事務是回滾了,而消息是投放成功的,並沒有保持原子性啊。那麼為什麼在執行executeLocalTransaction方法時,能夠查詢到termId=16的數據呢?還記得MySQL的事務隔離級別嗎?忘了的趕快複習一下吧。在事務提交前,我們是可以查詢到termId=16的數據的,所以消息提交了,看看消費端的情況,

msgs.size():1
this is transaction mq Wed Jun 17 09:07:15 CST 2020

消息也正常消費了,這明顯不符合我們的要求,我們如果在微服務之間使用這種方式保證數據的最終一致性,肯定會有大麻煩的。那我們該怎麼使用s呢?我們可以在executeLocalTransaction方法中,固定返回UNKNOW,數據插入數據庫成功也好,失敗也罷,我們都返回UNKNOW。那麼這個消息是否投放到隊列中,就由checkLocalTransaction決定了。checkLocalTransaction肯定在sendTransactionMQ后執行,而且和sendTransactionMQ不在同一事務中。我們改一下程序吧,

@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    return LocalTransactionState.UNKNOW;
}

其他的地方不用改,我們再發送一下消息,

sendResult:UNKNOW 時間:Wed Jun 17 09:56:59 CST 2020
java.lang.Exception: 數據庫事務異常

checkLocalTransaction termId=18 term:null
checkLocalTransaction:ROLLBACK_MESSAGE
  • 事務消息發送的結果是UNKNOW,然後拋出異常,事務回滾;
  • checkLocalTransaction方法,查詢termId=18的數據,為null,消息再回滾;

又看了一下消費端,沒有日誌。數據庫中也沒有termId=18的數據,這才符合我們的預期,數據庫插入不成功,消息投放不成功。我們再把拋出異常的代碼註釋掉,看看能不能都成功。

@Transactional(rollbackFor = Exception.class)
public void sendTransactionMQ() throws Exception {
    ……
    //throw new Exception("數據庫事務異常");
}

再執行一下發送端程序,日誌如下:

sendResult:UNKNOW 時間:Wed Jun 17 10:02:57 CST 2020
checkLocalTransaction termId=19 term:com.example.rocketmqdemo.entity.Term@3b643475
checkLocalTransaction:COMMIT_MESSAGE
  • 發送結果返回UNKNOW;
  • checkLocalTransaction方法查詢termId=19的數據,能夠查到;
  • 返回COMMIT_MESSAGE,消息提交到隊列中;

先看看數據庫中的數據吧,

termId=19的數據入庫成功了,再看看消費端的日誌,

msgs.size():1
this is transaction mq Wed Jun 17 10:02:56 CST 2020

消費成功,這才符合我們的預期。數據插入數據庫成功,消息投放隊列成功,消費消息成功。

總結

事務消息最重要的就是TransactionListener接口的實現,我們要理解executeLocalTransaction和checkLocalTransaction這兩個方法是干什麼用的,以及它們的執行時間。再一個就是和數據庫事務的結合,數據庫事務的隔離級別大家要知道。把上面這幾點掌握了,就可以靈活的使用RocketMQ的事務消息了。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

從0到1學會logstash的玩法(ELK)

本篇文章採用的採用的是logstash7.7.0版本,主要從如下幾個方面介紹

1、logstash是什麼,可以用來幹啥

2、logstash的基本原理是什麼

3、怎麼去玩這個elk的組件logstash

一、logstash是什麼,有哪些作用

1.1、概念

官方概念:Logstash是免費且開放的服務器端數據處理管道,能夠從多個來源採集數據,轉換數據,然後將數據發送到您最喜歡的“存儲庫”中。

1.2、功能

Logstash能夠動態地採集、轉換和傳輸數據,不受格式或複雜度的影響。利用Grok從非結構化數據中派生出結構,從IP地址解碼出地理坐標,匿名化或排除敏感字段,並簡化整體處理過程。

採集數據:

  能夠採集各種樣式、大小和來源的數據往往以各種各樣的形式,比如log日誌,收集redis、kafka等熱門分佈式技術的數據,並且還可以收集實現了java的JMS規範的消息中心的數據,或分散或集中地存在於很多系統中。Logstash支持各種輸入選擇,可以同時從眾多常用來源捕捉事件。能夠以連續的流式傳輸方式,輕鬆地從日誌、指標、Web應用、數據存儲以及各種AWS服務採集數據

 

 

 

解析數據和轉換:

  數據從源傳輸到存儲庫的過程中,Logstash過濾器能夠解析各個事件,識別已命名的字段以構建結構,並將它們轉換成通用格式,以便進行更強大的分析和實現商業價值。
Logstash能夠動態地轉換和解析數據,不受格式或複雜度的影響:

  • 利用Grok從非結構化數據中解析出結構數據
  • 從IP地址破譯出地理坐標
  • 將PII數據匿名化,完全排除敏感字段
  • 簡化整體處理,不受數據源、格式或架構的影響

  數據輸入端從各種數據源收集到的數據可能會有很多不是我們想要的,這時我們可以給Logstash定義過濾器,過濾器可以定義多個,它們依次執行,最終把我們想要的數據過濾出來,然後把這些數據解析成目標數據庫,如elasticsearch等能支持的數據格式存儲數據。

 

數據轉存:

  選擇好存儲庫,導出數據到存儲庫進行存儲,儘管Elasticsearch是我們的首選輸出方向,能夠為我們的搜索和分析帶來無限可能,但它並非唯一選擇。Logstash提供眾多輸出選擇,可以將數據發送到您要指定的地方,比如redis、kafka等

 

 

 

 

二、logstash的基本原理

  logstash分為三個步驟:inputs(必須的)→ filters(可選的)→ outputs(必須的),inputs生成時間,filters對其事件進行過濾和處理,outputs輸出到輸出端或者決定其存儲在哪些組件里。inputs和outputs支持編碼和解碼。

 

執行模型:

Logstash是協調inputs、filters和outputs執行事件處理的管道。

  Logstash管道中的每個input階段都在自己的線程中運行。將寫事件輸入到內存(默認)或磁盤上的中心隊列。每個管道工作線程從該隊列中取出一批事件,通過配置的filter處理該批事件,然後通過output輸出到指定的組件存儲。管道處理數據量的大小和管道工作線程的數量是可配置的

  默認情況下,Logstash使用管道階段(input→filter和filter→output)之間的內存限制隊列來緩衝事件。如果Logstash不安全地終止,存儲在內存中的所有事件都將丟失。為了幫助防止數據丟失,可以啟用Logstash將飛行中的事件持久化到磁盤。有關詳細信息,請參閱持久隊列https://www.elastic.co/guide/en/logstash/current/persistent-queues.html

如下是目前logstash7.7.0支持的inputs、outputs、filters

inputs:

azure_event_hubs,beats,cloudwatch,couchdb_changes,dead_letter_queue,elasticsearch,exec,file,ganglia,gelf,generator,github,google_cloud_storage,google_pubsub,graphite,heartbeat,http,http_poller,imap,irc,java_generator,java_stdin,jdbc,jms,jmx,kafka,kinesis,log4j,lumberjack,meetup,pipe,puppet_facter,rabbitmq,redis,relp,rss,s3,s3-sns-sqs,salesforce,snmp,snmptrap,sqlite,sqs,stdin,stomp,syslog,tcp,twitter,udp,unix,varnishlog,websocket,wmi,xmpp

outputs:

boundary, circonus, cloudwatch, csv, datadog, datadog_metrics, elastic_app_search, elasticsearch, email, exec, file, ganglia, gelf, google_bigquery, google_cloud_storage, google_pubsub, graphite, graphtastic, http, influxdb, irc, sink, java_stdout, juggernaut, kafka, librato, loggly, lumberjack, metriccatcher, mongodb, nagios, nagios_nsca, opentsdb, pagerduty, pipe, rabbitmq, redis, redmine, riak, riemann, s3, sns, solr_http, sqs, statsd, stdout, stomp, syslog, tcp, timber, udp, webhdfs, websocket, xmpp, zabbix

filters:

aggregate, alter, bytes, cidr, cipher, clone, csv, date, de_dot, dissect, dns, drop, elapsed, elasticsearch, environment, extractnumbers, fingerprint, geoip, grok, http, i18n, java_uuid, jdbc_static, jdbc_streaming, json, json_encode, kv, memcached, metricize, metrics, mutate, prune, range, ruby, sleep, split, syslog_pri, threats_classifier, throttle, tld, translate, truncate, urldecode, useragent, uuid, xml

三、玩一玩logstash

3.1、壓縮包方式安裝

下載地址1:https://www.elastic.co/cn/downloads/logstash   

下載地址2:https://elasticsearch.cn/download/

這裏需要安裝jdk,我使用的是elasticsearch7.7.0自帶的jdk:

解壓即安裝:

tar -zxvf logstash-7.7.0.tar.gz

來個logstash版本的HelloWorld:

./bin/logstash -e 'input { stdin { } } output { stdout {} }'

 

 

 

3.2、logstash配置文件

 logstash.yml:包含Logstash配置標誌。您可以在此文件中設置標誌,而不是在命令行中傳遞標誌。在命令行上設置的任何標誌都會覆蓋logstash中的相應設置

 pipelines.yml:包含在單個Logstash實例中運行多個管道的框架和指令。

 jvm.options:包含JVM配置標誌。使用此文件設置總堆空間的初始值和最大值。您還可以使用此文件為Logsta設置語言環境

 log4j2.properties:包含log4j 2庫的默認設置

 start.options (Linux):用於配置啟動服務腳本

logstash.yml文件詳解:

node.name  #默認主機名,該節點的描述名字
path.data  #LOGSTASH_HOME/data ,Logstash及其插件用於任何持久需求的目錄
pipeline.id #默認main,pipeline的id
pipeline.java_execution #默認true,使用java執行引擎
pipeline.workers #默認為主機cpu的個數,表示并行執行管道的過濾和輸出階段的worker的數量
pipeline.batch.size #默認125 表示單個工作線程在嘗試執行過濾器和輸出之前從輸入中收集的最大事件數
pipeline.batch.delay #默認50 在創建管道事件時,在將一個小批分派給管道工作者之前,每個事件需要等待多長時間(毫秒)
pipeline.unsafe_shutdown  #默認false,當設置為true時,即使內存中仍有運行的事件,強制Logstash在關閉期間將會退出。默認情況下,Logstash將拒絕退出,直到所有接收到的事件都被推入輸出。啟用此選項可能導致關機期間數據丟失
pipeline.ordered #默認auto,設置管道事件順序。true將強制對管道進行排序,如果有多個worker,則阻止logstash啟動。如果為false,將禁用維持秩序所需的處理。訂單順序不會得到保證,但可以節省維護訂單的處理成本
path.config #默認LOGSTASH_HOME/config  管道的Logstash配置的路徑
config.test_and_exit #默認false,設置為true時,檢查配置是否有效,然後退出。請注意,使用此設置不會檢查grok模式的正確性
config.reload.automatic #默認false,當設置為true時,定期檢查配置是否已更改,並在更改時重新加載配置。這也可以通過SIGHUP信號手動觸發
config.reload.interval  #默認3s ,檢查配置文件頻率
config.debug #默認false 當設置為true時,將完全編譯的配置显示為調試日誌消息
queue.type #默認memory ,用於事件緩衝的內部排隊模型。為基於內存中的遺留隊列指定內存,或為基於磁盤的脫機隊列(持久隊列)指定持久內存
path.queue #默認path.data/queue  ,在啟用持久隊列時存儲數據文件的目錄路徑
queue.page_capacity #默認64mb ,啟用持久隊列時(隊列),使用的頁面數據文件的大小。隊列數據由分隔為頁面的僅追加數據文件組成
queue.max_events #默認0,表示無限。啟用持久隊列時,隊列中未讀事件的最大數量
queue.max_bytes  #默認1024mb,隊列的總容量,以字節為單位。確保磁盤驅動器的容量大於這裏指定的值
queue.checkpoint.acks #默認1024,當啟用持久隊列(隊列)時,在強制執行檢查點之前被隔離的事件的最大數量
queue.checkpoint.writes #默認1024,當啟用持久隊列(隊列)時,強制執行檢查點之前的最大寫入事件數
queue.checkpoint.retry #默認false,啟用后,對於任何失敗的檢查點寫,Logstash將對每個嘗試的檢查點寫重試一次。任何後續錯誤都不會重試。並且不推薦使用,除非是在那些特定的環境中
queue.drain #默認false,啟用后,Logstash將等待,直到持久隊列耗盡,然後關閉
path.dead_letter_queue#默認path.data/dead_letter_queue,存儲dead-letter隊列的目錄
http.host #默認"127.0.0.1" 表示endpoint REST端點的綁定地址。
http.port #默認9600 表示endpoint REST端點的綁定端口。
log.level #默認info,日誌級別fatal,error,warn,info,debug,trace,
log.format #默認plain 日誌格式
path.logs  #默認LOGSTASH_HOME/logs 日誌目錄

3.3、keystore

keystore可以保護一些敏感的信息,使用變量的方式替代,比如使用ES_PWD代替elasticsearch的密碼,可以通過${ES_PWD}來獲取elasticsearch的密碼,這樣就是的密碼不再是明文密碼。

./bin/logstash-keystore create   #創建一個keyword
 ./bin/logstash-keystore add ES_PWD #創建一個elastic的passwd,然後通過${ES_PWD}使用該密碼
 ./bin/logstash-keystore list  #查看已經設置好的鍵值對
 ./bin/logstash-keystore remove ES_PWD #刪除在keyword中的key

例如:

 

 

 3.4、logstash命令

注意:參數和logstash.yml配置文件對應(這裏不詳解,請查看3.2節)

-n, --node.name NAME          
-f, --path.config CONFIG_PATH 
-e, --config.string CONFIG_STRING
--field-reference-parser MODE 
--modules MODULES             
-M, --modules.variable MODULES_VARIABLE
--setup                      
--cloud.id CLOUD_ID                                            
--cloud.auth CLOUD_AUTH       
--pipeline.id ID              
-w, --pipeline.workers COUNT 
--pipeline.ordered ORDERED   
--java-execution                                               
--plugin-classloaders         
-b, --pipeline.batch.size SIZE 
-u, --pipeline.batch.delay DELAY_IN_MS 
--pipeline.unsafe_shutdown    
--path.data PATH              
-p, --path.plugins PATH       
-l, --path.logs PATH         
--log.level LEVEL             
--config.debug                
-i, --interactive SHELL      
-V, --version                 
-t, --config.test_and_exit    
-r, --config.reload.automatic 
--config.reload.interval 
--http.host HTTP_HOST         
--http.port HTTP_PORT         
--log.format FORMAT           
--path.settings SETTINGS_DIR

3.5、logstash配置文件的格式和value type

1、logstash配置文件的格式如下:

輸入,解析過濾,輸出,其中filter不是必須的,其他兩個是必須的。

input {
  ...
}

filter {
  ...
}

output {
  ...
}

2、value types(logstash支持的數據類型)

array:數組可以是單個或者多個字符串值。

users => [ {id => 1, name => bob}, {id => 2, name => jane} ]

Lists:集合

path => [ "/var/log/messages", "/var/log/*.log" ]
uris => [ "http://elastic.co", "http://example.net" ]

Boolean:true 或者false

ssl_enable => true

Bytes:字節類型

my_bytes => "1113"   # 1113 bytes
my_bytes => "10MiB"  # 10485760 bytes
my_bytes => "100kib" # 102400 bytes
my_bytes => "180 mb" # 180000000 bytes

Codec:編碼類型

codec => "json"

Hash:哈希(散列)

match => {
  "field1" => "value1"
  "field2" => "value2"
  ...
}
# or as a single line. No commas between entries:
match => { "field1" => "value1" "field2" => "value2" }

Number:数字類型

port => 33

Password:密碼類型

my_password => "password"

URI:uri類型

my_uri => "http://foo:bar@example.net"

Path: 路徑類型

my_path => "/tmp/logstash"

String:字符串類型,字符串必須是單個字符序列。注意,字符串值被括在雙引號或單引號中

3.6、logstash的權限配置

配置賬號,一個種是role,一種是user,配置方式有兩種,一種是通過elasticsearch的API配置,一種是通過kibana配置:

第一種方式:通過elasticsearch的API配置:

#添加一個logstash_writer的角色
POST _xpack/security/role/logstash_writer
{
  "cluster": ["manage_index_templates", "monitor", "manage_ilm"], 
  "indices": [
    {
      "names": [ "logstash-*" ],  #索引的模式匹配
      "privileges": ["write","create","delete","create_index","manage","manage_ilm"]   #權限內容
    }
  ]
}

#添加一個有logstash_writer角色權限的用戶:logstash_internal
POST _xpack/security/user/logstash_internal
{
  "password" : "x-pack-test-password",
  "roles" : [ "logstash_writer"],  #分配角色
  "full_name" : "Internal Logstash User"
}

#添加一個logstash_reader角色,只有read權限
POST _xpack/security/role/logstash_reader
{
  "indices": [
    {
      "names": [ "logstash-*" ], 
      "privileges": ["read","view_index_metadata"]
    }
  ]
}

#添加一個有logstash_reader角色權限的用戶:logstash_user
POST _xpack/security/user/logstash_user
{
  "password" : "x-pack-test-password",
  "roles" : [ "logstash_reader", "logstash_admin"], 
  "full_name" : "Kibana User for Logstash"
}

第二種:通過kibana的界面配置

Management > Roles
Management > Users

 

 

 權限選擇見elasticsearch官網:

https://www.elastic.co/guide/en/elasticsearch/reference/current/authorization.html

https://www.elastic.co/guide/en/elasticsearch/reference/current/security-privileges.html

 3.7、多管道配置

  如果需要在同一個進程中運行多個管道,Logstash提供了一種通過名為pipelines.yml的配置文件來實現此目的的方法。

例如:

- pipeline.id: my-pipeline_1
  path.config: "/etc/path/to/p1.config"
  pipeline.workers: 3
- pipeline.id: my-other-pipeline
  path.config: "/etc/different/path/p2.cfg"
  queue.type: persisted

  該文件在YAML文件格式中,並包含一個字典列表,其中每個字典描述一個管道,每個鍵/值對指定該管道的設置。該示例展示了通過id和配置路徑描述的兩個不同管道。對於第一個管道,為pipeline.workers的值設置為3,而在另一个中,持久隊列特性被啟用。未在pipelines.yml顯式設置的值。yml文件將使用到logstash中指定的默認值。
  當啟動Logstash不帶參數時,它將讀取管道pipelines.yml。yml文件並實例化文件中指定的所有管道。另一方面,當使用-e或-f時,Logstash會忽略管道。

注意:

  • 如果當前配置的事件流不共享相同的輸入/過濾器和輸出,並且使用標記和條件將它們彼此分離,這顯得多個管道尤其重要
  • 在單個實例中擁有多個管道還允許這些事件流具有不同的性能和持久性參數(例如,pipeline.workers和persistent queues的不同設置)。這種分離意味着一條管道中的阻塞輸出不會對另一條管道產生反壓力。
  • 考慮管道之間的資源競爭是很重要的,因為默認值是針對單個管道調優的。因此,例如,考慮減少每個管道使用pipeline.worker的數量,因為每個管道在默認情況下每個CPU核使用一個worker。
  • 每個管道都隔離持久隊列和死信隊列,它們的位置命名空間為pipeline.id的值

各管道之間的通信原理:https://www.elastic.co/guide/en/logstash/current/pipeline-to-pipeline.html,有興趣的可以了解下。

3.8、配置的重新加載

在我們運行logstash的過程,不想停掉logstash進程,但是又想修改配置,就可以使用到配置的重新加載了,有兩種方式。

第一種是在啟動的時候指定參數:

bin/logstash -f apache.config --config.reload.automatic

Logstash每3秒檢查一次配置更改。要更改此間隔,請使用–config.reload.interval <interval>選項,其中interval指定Logstash檢查配置文件更改的頻率(以秒為單位),請注意,必須使用單位限定符(s)

第二種就是強制加載配置文件

kill -SIGHUP pid  #pid為logstash的pid

自動配置重新加載配置注意點:

  • 當Logstash檢測到配置文件中的更改時,它將通過停止所有輸入來停止當前管道,並嘗試創建使用更新后的配置的新管道。驗證新配置的語法后,Logstash驗證所有輸入和輸出都可以初始化(例如,所有必需的端口都已打開)。如果檢查成功,則Logstash會將現有管道與新管道交換。如果檢查失敗,舊管道將繼續運行,並且錯誤將傳播到控制台。
  • 在自動重新加載配置期間,不會重新啟動JVM。管道的創建和交換都在同一過程中進行。
  • 對grok模式文件的更改也將重新加載,但僅在配置文件中的更改觸發重新加載(或重新啟動管道)時。

3.9、常用filter介紹

1、grok

注:grok插件是一個十分耗費資源的插件

官網:https://www.elastic.co/guide/en/logstash/current/plugins-filters-grok.html

Grok是將非結構化日誌數據解析為結構化和可查詢內容的好方法,非常適合syslog日誌,apache和其他Web服務器日誌,mysql日誌等等

首先官方提供了120中的匹配模式(但是我一直都沒打開這個網址):https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

還有一個用來驗證自定義的解析是否正確的一個網址:http://grokdebug.herokuapp.com/

既然我沒能打開官方提供的120個的模式匹配,從一片博客中找到了一部分的匹配模式(如下):https://blog.csdn.net/cui929434/article/details/94390617

USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b

POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}

# Networking
MAC (?:%{CISCOMAC}|%{WINDOWSMAC}|%{COMMONMAC})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
IP (?:%{IPV6}|%{IPV4})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME}
IPORHOST (?:%{HOSTNAME}|%{IP})
HOSTPORT %{IPORHOST}:%{POSINT}

# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?>/(?>[\w_%!$@:.,-]+|\\.)*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?

# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])

# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)

# Years?
YEAR (?>\d\d){1,2}
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}

# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}

# Shortcuts
QS %{QUOTEDSTRING}

# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}

# Log Levels
LOGLEVEL ([Aa]lert|ALERT|[Tt]race|TRACE|[Dd]ebug|DEBUG|[Nn]otice|NOTICE|[Ii]nfo|INFO|[Ww]arn?(?:ing)?|WARN?(?:ING)?|[Ee]rr?(?:or)?|ERR?(?:OR)?|[Cc]rit?(?:ical)?|CRIT?(?:ICAL)?|[Ff]atal|FATAL|[Ss]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)

grok內置的一些匹配模式

基礎語法,一種是自帶的模式,一種是自定義的模式:

自帶的模式語法:  %{SYNTAX:SEMANTIC}

SYNTAX是將匹配文本模式的名稱,grok自帶的那些匹配模式名
SEMANTIC是你給一段文字的標識相匹配該匹配模式匹配到的內容,相當於一個字段名

例如:

%{NUMBER:duration} %{IP:client}

比如要解析如下的日誌:

55.3.244.1 GET /index.html 15824 0.043

就可以使用該匹配模式去匹配:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

得出的結果如下:

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

 

自定義的模式語法:(?<field_name>the pattern here)

例如:

(?<queue_id>[0-9A-F]{10,11})  #表示10-11個字符的16進制

我們可以創建一個目錄pattters,把我自定義的模式添加進去,在使用的時候就可以使用grok自帶的匹配模式的語法,例如:

我們在./patterns/postfix文件中添加如下內容
POSTFIX_QUEUEID [0-9A-F]{10,11}

如上我們就定義了一個匹配模式了,可以使用如下的方式使用。假如我們有如下的日誌格式:

Jan  1 06:25:43 mailserver14 postfix/cleanup[21403]: BEF25A72965: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

然後我們對其就行解析:

filter {
      grok {
        patterns_dir => ["./patterns"]  #指定自定義的匹配模式路徑
        match => { "message" => "%{SYSLOGBASE} %{POSTFIX_QUEUEID:queue_id}: %{GREEDYDATA:syslog_message}" }
      }
    }

解析出的結果如下:

timestamp: Jan  1 06:25:43
logsource: mailserver14
program: postfix/cleanup
pid: 21403
queue_id: BEF25A72965
syslog_message: message-id=<20130101142543.5828399CCAF@mailserver14.example.com>

 grok的配置選項

break_on_match
值類型是布爾值
默認是true
描述:match可以一次設定多組,預設會依照順序設定處理,如果日誌滿足設定條件,則會終止向下處理。但有的時候我們會希望讓Logstash跑完所有的設定,這時可以將break_on_match設為false。

keep_empty_captures
值類型是布爾值
默認值是 false
描述:如果為true,捕獲失敗的字段將設置為空值

match
值類型是數組
默認值是 {}
描述:字段值的模式匹配
例如:
filter {
  grok { match => { "message" => "Duration: %{NUMBER:duration}" } }
}
#如果你需要針對單個字段匹配多個模式,則該值可以是一組,例如:
filter {
  grok { match => { "message" => [ "Duration: %{NUMBER:duration}", "Speed: %{NUMBER:speed}" ] } }
}

named_captures_only
值類型是布爾值
默認值是 true
描述:如果設置為true,則僅存儲來自grok的命名捕獲

overwrite
值類型是 array
默認是[]
描述:覆蓋已經存在的字段內容
例如:

filter {
  grok {
    match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
    overwrite => [ "message" ]
  }
}
如果日誌是May 29 16:37:11 sadness logger: hello world經過match屬性match => { “message” => “%{SYSLOGBASE} %{DATA:message}” }處理后,message的值變成了hello world。這時如果使用了overwrite => [ “message” ]屬性,那麼原來的message的值將被覆蓋成新值。

pattern_definitions
值類型是 數組
默認值是 {}
描述:模式名稱和模式正則表達式,也是用於定義當前過濾器要使用的自定義模式。匹配現有名稱的模式將覆蓋預先存在的定義。可以將此視為僅適用於grok定義的內聯模式,patterns_dir是將模式寫在外部。
例如:
filter {
    grok {
        patterns_dir => "/usr/local/elk/logstash/patterns"
        pattern_definitions => {"MYSELFTIMESTAMP" => "20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})"}
        match => {"message" => ["%{MYSELFTIMESTAMP:timestamp} %{JAVACLASS:message}","%{MYSELF:content}"]}
    }
}

patterns_dir
值類型是數組
默認值是 []
描述:一些複雜的正則表達式,不適合直接寫到filter中,可以指定一個文件夾,用來專門保存正則表達式的文件,需要注意的是該文件夾中的所有文件中的正則表達式都會被依次加載,包括備份文件。

patterns_dir => ["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]
正則文件以文本格式描述:

patterns_file_glob
屬性值的類型:string
默認值:“*”
描述:針對patterns_dir屬性中指定的文件夾里哪些正則文件,可以在這個filter中生效,需要本屬性來指定。默認值“*”是指所有正則文件都生效。

tag_on_failure
值類型是數組
默認值是 [“_grokparsefailure”]
描述:沒有成功匹配時,將值附加到字段到tags

tag_on_timeout
值類型是字符串
默認值是 “_groktimeout”
描述:如果Grok正則表達式超時,則應用標記。

timeout_millis
值類型是数字
默認值是 30000
描述: 嘗試在這段時間后終止正則表達式。如果應用了多個模式,則這適用於每個模式。這將永遠不會提前超時,但超時可能需要一些時間。實際的超時時間是基於250ms量化的近似值。設置為0以禁用超時。

各組件的公共配置選項

break_on_match
值類型是布爾值
默認是true
描述:match可以一次設定多組,預設會依照順序設定處理,如果日誌滿足設定條件,則會終止向下處理。但有的時候我們會希望讓Logstash跑完所有的設定,這時可以將break_on_match設為false。

keep_empty_captures
值類型是布爾值
默認值是 false
描述:如果為true,捕獲失敗的字段將設置為空值

match
值類型是數組
默認值是 {}
描述:字段值的模式匹配
例如:

filter {
  grok { match => { "message" => "Duration: %{NUMBER:duration}" } }
}

#如果你需要針對單個字段匹配多個模式,則該值可以是一組,例如:
filter {
  grok { match => { "message" => [ "Duration: %{NUMBER:duration}", "Speed: %{NUMBER:speed}" ] } }
}

named_captures_only
值類型是布爾值
默認值是 true
描述:如果設置為true,則僅存儲來自grok的命名捕獲

overwrite
值類型是 array
默認是[]
描述:覆蓋已經存在的字段內容
例如:

filter {
  grok {
    match => { "message" => "%{SYSLOGBASE} %{DATA:message}" }
    overwrite => [ "message" ]
  }
}

如果日誌是May 29 16:37:11 sadness logger: hello world經過match屬性match => { “message” => “%{SYSLOGBASE} %{DATA:message}” }處理后,message的值變成了hello world。這時如果使用了overwrite => [ “message” ]屬性,那麼原來的message的值將被覆蓋成新值。

pattern_definitions
值類型是 數組
默認值是 {}
描述:模式名稱和模式正則表達式,也是用於定義當前過濾器要使用的自定義模式。匹配現有名稱的模式將覆蓋預先存在的定義。可以將此視為僅適用於grok定義的內聯模式,patterns_dir是將模式寫在外部。
例如:

filter {
    grok {
        patterns_dir => "/usr/local/elk/logstash/patterns"
        pattern_definitions => {"MYSELFTIMESTAMP" => "20%{YEAR}-%{MONTHNUM}-%{MONTHDAY} %{HOUR}:?%{MINUTE}(?::?%{SECOND})"}
        match => {"message" => ["%{MYSELFTIMESTAMP:timestamp} %{JAVACLASS:message}","%{MYSELF:content}"]}
    }
}

patterns_dir
值類型是數組
默認值是 []
描述:一些複雜的正則表達式,不適合直接寫到filter中,可以指定一個文件夾,用來專門保存正則表達式的文件,需要注意的是該文件夾中的所有文件中的正則表達式都會被依次加載,包括備份文件。

patterns_dir => ["/opt/logstash/patterns", "/opt/logstash/extra_patterns"]
正則文件以文本格式描述:


patterns_file_glob
屬性值的類型:string
默認值:“*”
描述:針對patterns_dir屬性中指定的文件夾里哪些正則文件,可以在這個filter中生效,需要本屬性來指定。默認值“*”是指所有正則文件都生效。

tag_on_failure
值類型是數組
默認值是 [“_grokparsefailure”]
描述:沒有成功匹配時,將值附加到字段到tags

tag_on_timeout
值類型是字符串
默認值是 “_groktimeout”
描述:如果Grok正則表達式超時,則應用標記。

timeout_millis
值類型是数字
默認值是 30000
描述: 嘗試在這段時間后終止正則表達式。如果應用了多個模式,則這適用於每個模式。這將永遠不會提前超時,但超時可能需要一些時間。實際的超時時間是基於250ms量化的近似值。設置為0以禁用超時。

常用選項
所有過濾器插件都支持以下配置選項:


dd_field
值類型是散列
默認值是 {}
描述:如果匹配成功,向此事件添加任意字段。字段名可以是動態的,並使用%{Field}包含事件的一部分
filter {
      grok {
        add_field => { "foo_%{somefield}" => "Hello world, from %{host}" }
      }
    }

# 你也可以一次添加多個字段
filter {
  grok {
    add_field => {
      "foo_%{somefield}" => "Hello world, from %{host}"
      "new_field" => "new_static_value"
    }
  }
}

add_tag
值類型是數組
默認值是 []
描述:如果此過濾器成功,請向該事件添加任意標籤。標籤可以是動態的,並使用%{field} 語法包含事件的一部分。
例如:

filter {
  grok {
    add_tag => [ "foo_%{somefield}" ]
  }
}

# 你也可以一次添加多個標籤
filter {
  grok {
    add_tag => [ "foo_%{somefield}", "taggedy_tag"]
  }
}

enable_metric
值類型是布爾值
默認值是 true
描述:禁用或啟用度量標準

id
值類型是字符串
此值沒有默認值。
描述:向插件實例添加唯一ID,此ID用於跟蹤插件特定配置的信息。
例如:

filter {
  grok {
    id => "ABC"
  }
}

periodic_flush
值類型是布爾值
默認值是 false
描述:如果設置為ture,會定時的調用filter的更新函數(flush method)

remove_field
值的類型:array
默認值:[]
描述:刪除當前文檔中的指定filted

filter {
  grok {
    remove_field => [ "foo_%{somefield}" ]
  }
}
# 你也可以一次移除多個字段:
filter {
  grok {
    remove_field => [ "foo_%{somefield}", "my_extraneous_field" ]
  }
}

remove_tag
值類型是數組
默認值是 []
描述:如果此過濾器成功,請從該事件中移除任意標籤。標籤可以是動態的,並使用%{field} 語法包括事件的一部分。
例如:

filter {
  grok {
    remove_tag => [ "foo_%{somefield}" ]
  }
}
# 你也可以一次刪除多個標籤
filter {
  grok {
    remove_tag => [ "foo_%{somefield}", "sad_unwanted_tag"]
  }
}

2、mutate

官網網址:https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html

mutate允許對字段執行常規改變。可以重命名、刪除、替換和修改事件中的字段。

二話不說,來個例子先:

filter {
    mutate {
        split => ["hostname", "."]  #切分
        add_field => { "shortHostname" => "%{hostname[0]}" } #獲取切分后的第一個字段作為添加字段
    }
    mutate {
        rename => ["shortHostname", "hostname" ] #重命名
    }
}

mutate的配置選項:

常用的一些操作:
convert:轉換數據類型,數據類型hash
可以裝換的數據類型有:integer,string,integer_eu(和integer相同,显示格式為1.000),float,float_eu,boolean
實例:
filter {
      mutate {
        convert => {
          "fieldname" => "integer"
          "booleanfield" => "boolean"
        }
      }
    }

copy:類型hash,將現有字段複製到另一個字段。現有的目標域將被覆蓋
實例:
 filter {
      mutate {
         copy => { "source_field" => "dest_field" }
      }
    }
    
gsub:類型array,根據字段值匹配正則表達式,並用替換字符串替換所有匹配項。只支持字符串或字符串數組的字段。對於其他類型的字段,將不採取任何操作。
實例:
filter {
      mutate {
        gsub => [
          # replace all forward slashes with underscore
          "fieldname", "/", "_",
          # replace backslashes, question marks, hashes, and minuses
          # with a dot "."
          "fieldname2", "[\\?#-]", "."
        ]
      }
    }
    
join:類型hash,用分隔符連接數組。對非數組字段不執行任何操作
實例:
filter {
     mutate {
       join => { "fieldname" => "," }
     }
   }
  
lowercase:類型array,轉為小寫
實例:
filter {
      mutate {
        lowercase => [ "fieldname" ]
      }
    }
    
merge:類型hash,合併數組或散列的兩個字段。字符串字段將自動轉換為數組
實例:
filter {
     mutate {
        merge => { "dest_field" => "added_field" }
     }
   }
   
coerce:類型hash,設置存在但為空的字段的默認值
實例:
filter {
      mutate {
        # Sets the default value of the 'field1' field to 'default_value'
        coerce => { "field1" => "default_value" }
      }
    }
    
rename:類型hash,重命名
實例:
filter {
      mutate {
        # Renames the 'HOSTORIP' field to 'client_ip'
        rename => { "HOSTORIP" => "client_ip" }
      }
    }
    
replace:類型hash,用新值替換字段的值
實例:
filter {
      mutate {
        replace => { "message" => "%{source_host}: My new message" }
      }
    }
    
split:類型hash,使用分隔符將字段分割為數組。只對字符串字段有效
實例:
filter {
      mutate {
         split => { "fieldname" => "," }
      }
    }
    
strip:類型array,字段中刪除空白。注意:這隻對前導和后導空格有效。
實例:
filter {
      mutate {
         strip => ["field1", "field2"]
      }
    }
    
update:類型hash,使用新值更新現有字段。如果該字段不存在,則不採取任何操作。
實例:
filter {
      mutate {
        update => { "sample" => "My new message" }
      }
    }
    
uppercase:類型array,轉為大寫
實例:
filter {
      mutate {
        uppercase => [ "fieldname" ]
      }
    }
    
capitalize:類型array,將字符串轉換為等效的大寫字母。
實例:
filter {
      mutate {
        capitalize => [ "fieldname" ]
      }
    }

tag_on_failure:類型string,如果在應用此變異篩選器期間發生故障,則終止其餘操作,默認值:_mutate_error

公共配置見:grok公共配置

3、date

date filter用於從字段解析日期,然後使用該日期或時間戳作為事件的logstash時間戳

date常用的配置選項:

match:類型array,字段名在前,格式模式在後的數組[ field,formats... ],表示該字段能夠匹配到的時間模式,時間模式可以有多種
實例:
filter {
      date {
        match => [ "logdate", "MMM dd yyyy HH:mm:ss" ]
      }
    }
    
tag_on_failure:類型array,默認值["_dateparsefailure"],當沒有成功匹配時,將值追加到tags字段

target:類型String,默認值"@timestamp",將匹配的時間戳存儲到給定的目標字段中。如果沒有提供,默認更新事件到@timestamp字段。

timezone:類型String,表示時區,可以在該網址查看:http://joda-time.sourceforge.net/timezones.html

公共配置見:grok公共配置

這裏只對如上三種filter說明,具體其他的filter請見官網:https://www.elastic.co/guide/en/logstash/current/filter-plugins.html

3.10、案例一、apache日誌解析

這個例子屬官網的一個例子:https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html

但是我這裏不弄這麼負責,我們不使用filebeat,直接使用logstash,apache日誌的數據集下載:https://download.elastic.co/demos/logstash/gettingstarted/logstash-tutorial.log.gz

我這裏不打算安裝apach,所以直接使用官方提供的數據集。

下載數據集,然後解壓文件,就可以得到我們的一個日誌文件:logstash-tutorial.log

首先我們看一下apache日誌的格式:

[elk@lgh ~]$ tail -3 logstash-tutorial.log 
86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /projects/xdotool/ HTTP/1.1" 200 12292 "http://www.haskell.org/haskellwiki/Xmonad/Frequently_asked_questions" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"
86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /reset.css HTTP/1.1" 200 1015 "http://www.semicomplete.com/projects/xdotool/" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"
86.1.76.62 - - [04/Jan/2015:05:30:37 +0000] "GET /style2.css HTTP/1.1" 200 4877 "http://www.semicomplete.com/projects/xdotool/" "Mozilla/5.0 (X11; Linux x86_64; rv:24.0) Gecko/20140205 Firefox/24.0 Iceweasel/24.3.0"

然後開始配置:

cd logstash-7.7.0/ && mkdir conf.d
cd conf.d/ 
vim  apache.conf
#############apache.conf的內容如下###################
input {
        file {
            path => "/home/elk/logstash-tutorial.log"
            type => "log"
            start_position => "beginning"
            }
    }
filter {
    grok {
        match => { "message" => "%{COMBINEDAPACHELOG}"}
    }
}

output {
    stdout { codec => rubydebug }
}

然後啟動命令(可以選擇用nohup後台啟動):

 cd logstash-7.7.0/ && ./bin/logstash -f conf.d/apache.conf

執行結果如下(部分結果):

{
           "verb" => "GET",
          "bytes" => "8948",
           "type" => "log",
           "host" => "gxt_126_233",
    "httpversion" => "1.0",
        "message" => "67.214.178.190 - - [04/Jan/2015:05:20:59 +0000] \"GET /blog/geekery/installing-windows-8-consumer-preview.html HTTP/1.0\" 200 8948 \"http://www.semicomplete.com/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:21.0) Gecko/20100101 Firefox/21.0\"",
      "timestamp" => "04/Jan/2015:05:20:59 +0000",
       "referrer" => "\"http://www.semicomplete.com/\"",
     "@timestamp" => 2020-06-17T01:23:47.817Z,
           "path" => "/data/hd05/elk/logstash-tutorial.log",
          "ident" => "-",
       "response" => "200",
       "@version" => "1",
        "request" => "/blog/geekery/installing-windows-8-consumer-preview.html",
       "clientip" => "67.214.178.190",
          "agent" => "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.7; rv:21.0) Gecko/20100101 Firefox/21.0\"",
           "auth" => "-"
}
{
           "verb" => "GET",
          "bytes" => "1015",
           "type" => "log",
           "host" => "gxt_126_233",
    "httpversion" => "1.1",
        "message" => "66.249.73.185 - - [04/Jan/2015:05:18:48 +0000] \"GET /reset.css HTTP/1.1\" 200 1015 \"-\" \"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\"",
      "timestamp" => "04/Jan/2015:05:18:48 +0000",
       "referrer" => "\"-\"",
     "@timestamp" => 2020-06-17T01:23:47.815Z,
           "path" => "/data/hd05/elk/logstash-tutorial.log",
          "ident" => "-",
       "response" => "200",
       "@version" => "1",
        "request" => "/reset.css",
       "clientip" => "66.249.73.185",
          "agent" => "\"Mozilla/5.0 (compatible; Googlebot/2.1; +http://www.google.com/bot.html)\"",
           "auth" => "-"
}
{
           "verb" => "GET",
          "bytes" => "28370",
           "type" => "log",
           "host" => "gxt_126_233",
    "httpversion" => "1.0",
        "message" => "207.241.237.220 - - [04/Jan/2015:05:21:16 +0000] \"GET /blog/tags/projects HTTP/1.0\" 200 28370 \"http://www.semicomplete.com/blog/tags/C\" \"Mozilla/5.0 (compatible; archive.org_bot +http://www.archive.org/details/archive.org_bot)\"",
      "timestamp" => "04/Jan/2015:05:21:16 +0000",
       "referrer" => "\"http://www.semicomplete.com/blog/tags/C\"",
     "@timestamp" => 2020-06-17T01:23:47.817Z,
           "path" => "/data/hd05/elk/logstash-tutorial.log",
          "ident" => "-",
       "response" => "200",
       "@version" => "1",
        "request" => "/blog/tags/projects",
       "clientip" => "207.241.237.220",
          "agent" => "\"Mozilla/5.0 (compatible; archive.org_bot +http://www.archive.org/details/archive.org_bot)\"",
           "auth" => "-"
}

執行結果

2.11、案例二、nginx日誌解析

首先安裝nginx:nginx功能介紹和基本安裝

這裏我們也不使用filebeat,因為這篇文章只是介紹logstash

配置:

cd conf.d/ 
vim  nginx.conf
############nginx.conf配置如下#####################
input {
        file {
            path => "/usr/local/nginx/logs/access.log"
            type => "log"
            start_position => "beginning"
            }
    }
   
filter {
grok {
        match => { "message" => ["(?<RemoteIP>(\d*.\d*.\d*.\d*)) - %{DATA:[nginx][access][user_name]} \[%{HTTPDATE:[nginx][access][time]}\] \"%{WORD:[nginx][access][method]} %{DATA:[nginx][access][url]} HTTP/%{NUMBER:[nginx][access][http_version]}\" %{NUMBER:[nginx][access][response_code]} %{NUMBER:[nginx][access][body_sent][bytes]} \"%{DATA:[nginx][access][referrer]}\" \"%{DATA:[nginx][access][agent]}\""] }
        add_field => {
                "Device" => "Charles Desktop"
        }
        remove_field => "message"
        remove_field => "beat.version"
        remove_field => "beat.name"
    }
}
    output {    
             elasticsearch {
                    hosts => ["192.168.110.130:9200"]
                    index => "nginx-log-%{+YYYY.MM.dd}"
                }
    }

如上的配置中輸出到elasticsearch中,這裏沒有設置密碼,所以不需要用戶和密碼,還有就是這裏使用的默認模板,如果想要修改的話可以使用,可以添加如下配置:

user => "elastic"  #用戶
password => "${ES_PWD}" #通過keystore存儲的密碼
manage_template => false #關閉默認的模板
template_name => "elastic-slowquery" #指定自定義的模板

執行命令啟動logstash

cd logstash-7.7.0/ && ./bin/logstash -f conf.d/nginx.conf

執行的結果(查看elasticsearch集群):

 

 

 從上面的兩個圖看,一個創建了我們在nginx.conf中指定的一個索引,然後索引的內容都是解析出來的一些字段內容。

3.12、案例三、elasticsearch慢日誌解析

這是實例我們採用filebeat+logstash+elasticsearch,還有權限驗證進行試驗:

這裏主要是對elasticsearch的慢日誌查詢做解析,雖然我在一篇文章搞懂filebeat(ELK)中篇文章中直接通過filebeat的elasticsearch(beat版本)的模塊對其做過解析,但是解析的還是不夠特別完善,這裏引入logstash對其解析,過濾。

首先配置filebeat文件(這裏只配置了一個輸入和一個輸出,沒有做多餘的處理,只是用來收集日誌):

#=========================== Filebeat inputs =============================
filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log

  # Change to true to enable this input configuration.
  enabled: true

  # Paths that should be crawled and fetched. Glob based paths.
  paths:
    - /var/logs/es_aaa_index_search_slowlog.log
    - /var/logs/es_bbb_index_search_slowlog.log
    #- c:\programdata\elasticsearch\logs\*

#================================ Outputs =====================================

# Configure what output to use when sending the data collected by the beat.

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.110.130:5044","192.168.110.131:5044","192.168.110.132:5044"]
  loadbalance: true   #這裏採用負載均衡機制,

  # Optional SSL. By default is off.
  # List of root certificates for HTTPS server verifications
  #ssl.certificate_authorities: ["/etc/pki/root/ca.pem"]

  # Certificate for SSL client authentication
  #ssl.certificate: "/etc/pki/client/cert.pem"

  # Client Certificate Key
  #ssl.key: "/etc/pki/client/cert.key"

然後啟動filebeat:

cd filebeat-7.7.0-linux-x86_64 && ./filebeat -e

然後配置logstash的配置文件:

cd conf.d/ 
vim  es.conf
############es.conf配置如下##############
input {
        beats{
                port => 5044
        }
    }
   
filter {
grok {
        match => {"message" => "\[%{TIMESTAMP_ISO8601:query_time},%{NUMBER:number1}\]\s*\[%{DATA:log_type}\]\s*\[%{DATA:index_query_type}\]\s*\[%{DATA:es_node}\]\s*\[%{DATA:index_name}\]\s*\[%{NUMBER:share_id}\]\s*took\[%{DATA:times_s}\],\s*took_millis\[%{NUMBER:query_times_ms}\],\s*types\[%{DATA:types}\],\s*stats\[%{DATA:status}\],\s*search_type\[%{DATA:search_type}\],\s*total_shards\[%{NUMBER:total_shards}\],\s*source\[%{DATA:json_query}\],\s*extra_source"}
        remove_field => ["message","@version","status","times_s","@timestamp","number1"]
    }
  mutate{
        convert => {
        "query_times_ms" => "integer"
}
}
}
    output {    
             elasticsearch {
                    hosts => ["192.168.110.130:9200","192.168.110.131:9200","192.168.110.132:9200"]
                    index => "elastic-slowquery-222"
                    user => "elastic"
                    password => "${ES_PWD}"
                    manage_template => false
                    template_name => "elastic-slowquery"
                }
    }

如上配置使用了用戶的權限驗證,以及elasticsearch的自定義模板

啟動logstash

cd logstash-7.7.0/ && ./bin/logstash -f conf.d/es.conf

登錄es查看結果:

 

logstash就介紹到這裏了,如果有疑問多看官網比較好

 

參考:

logstash官網:https://www.elastic.co/guide/en/logstash/current/index.html

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

LeetCode 78,面試常用小技巧,通過二進制獲得所有子集

本文始發於個人公眾號:TechFlow,原創不易,求個關注

今天是LeetCode專題第47篇文章,我們一起來看下LeetCode的第78題Subsets(子集)。

這題的官方難度是Medium,點贊3489,反對79,通過率59.9%。從這個數據我們也可以看得出來,這是一道難度不是很大,但是質量很高的題。的確,在這道題的解法當中,你會學到一種新的技巧。

廢話不多說,我們先來看題意。

題意

這題的題意非常簡單,和上一題有的一拼,基本上從標題就能猜到題目的意思。給定一個沒有重複元素的int型數組,要求返回所有的子集,要求子集當中沒有重複項,每一項當中也沒有重複的元素。

樣例

Input: nums = [1,2,3]
Output:
[
  [3],
  [1],
  [2],
  [1,2,3],
  [1,3],
  [2,3],
  [1,2],
  []
]

照搬上題

剛拿到手可能有點蒙,但是稍微想一下就會發現,這一題和上題非常接近,兩者唯一的不同就是,子集沒有數量的限制,從空集開始,一直到它本身結束,不論多少個元素都可以。而上一題要求的是有數量限制的,也就是說上一題我們求的其實是限定了k個元素的子集。

想明白這點就簡單了,顯然我們可以復用上一題的算法,我們來遍歷這個k,從0到n,就可以獲得所有的子集了。只要你上一題做出來了,那麼這題幾乎沒有任何難度。如果你沒有看過上一題的文章的話,可以通過傳送門回顧一下:

LeetCode 77,組合挑戰,你能想出不用遞歸的解法嗎?

我們直接來看代碼:

class Solution:
    def subsets(self, nums: List[int]) -> List[List[int]]:
        # 上一題求解k個組合的解法
        def combine(n, k, ret):
            window = list(range(1, k+1)) + [n+1]
            j = 0
            
            while j < k:
                cur = []
                for i in range(k):
                    cur.append(nums[window[i] - 1])
                ret.append(cur[:])
                
                j = 0
                while j < k and window[j+1] == window[j] + 1:
                    window[j] = j + 1
                    j += 1
                window[j] += 1
                
        # 手動添加空集
        ret = [[]]
        n = len(nums)
        # 遍歷k從1到n
        for i in range(1, n+1):
            combine(n, i, ret)
        return ret

二進制組合

照搬上一題的解法固然是可行的,但是這麼做完全沒有必要,也得不到任何收穫。所以我們應該想一下新的解法。

既然這道題讓我們求的是所有的子集,那麼我們可以從子集的特點入手。我們之前學過,一個含有n個元素的子集的數量是。這個很容易想明白,因為n個元素,每個元素都有兩個狀態,選或者不選。並且這n個元素互相獨立,也就是說某個元素選或者不選並不會影響其他的元素,所以我們可以知道一共會有種可能。

我們也可以從組合數入手,我們令所有子集的數量為S,那麼根據上面我們用組合求解的解法,可以得到:

兩者的結果是一樣的,說明這個結論一定是正確的。

不知道大家看到n個元素,每個元素有兩個取值有什麼想法,如果做過的題目數量夠多的話,應該能很快聯想到二進制。因為在二進制當中,每一個二進制位就只有0和1兩種取值。那麼我們就可以用n位的二進制數來表示n個元素集合取捨的狀態。n位二進制數的取值範圍是,所以我們用一重循環去遍歷它,就相當於一重循環遍歷了整個集合所有的狀態。

這種技巧我們也曾經在動態規劃狀態壓縮的文章當中提到過,並且在很多題目當中都會用到。所以建議大家可以了解一下,說不定什麼時候面試就用上了。

根據這個技巧, 我們來實現代碼就非常簡單了。

class Solution:
    def subsets(self, nums: List[int]) -> List[List[int]]:
        ret = []
        n = len(nums)
        # 遍歷所有的狀態
        # 1左移n位相當於2的n次方
        for s in range(1 << n):
            cur = []
            # 通過位運算找到每一位是0還是1
            for i in range(n):
                # 判斷s狀態在2的i次方上,也就是第i位上是0還是1
                if s & (1 << i):
                    cur.append(nums[i])
            ret.append(cur[:])
            
        return ret

從代碼來看明顯比上面的解法短得多,實際上運行的速度也更快,因為我們去掉了所有多餘的操作,我們遍歷的每一個狀態都是正確的,也不用考慮重複元素的問題。

總結

不知道大家看完文章都有一些什麼感悟,可能第一種感悟就是LeetCode應該按照順序刷吧XD。

的確如此,LeetCode出題人出題都是有套路的,往往出了一道題之後,為了提升題目數量(湊提數),都會在之前題目的基礎上做變形,變成一道新題。所以如果你按照順序刷題的話,會很明顯地發現這一點。如果你從這個角度出發去思考的話,不但能理解題目之間的聯繫,還能揣摩出出題人的用意,這也是一件很有趣的事情。

如果喜歡本文,可以的話,請點個關注,給我一點鼓勵,也方便獲取更多文章。

本文使用 mdnice 排版

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

Probius:一個功能強大的自定義任務系統

斷更的這些日子,我又折騰了一個輪子,文末參考源碼

大約在一年半以前寫過一篇文章『探秘varian:優雅的發布部署程序』,裡邊有講到我們採用類似lego的模塊化方式來構建CICD的流程,雖能滿足我們的需求,但終究需要編寫代碼,使用成本有點高,不夠友好。近段時間終於下定決心將其重構,只為帶來更好的使用體驗,於是便有了這個項目Probius

Probius為遊戲星際爭霸里的角色,是一隻充滿好奇心的星靈探測機,取此名字的意思也是希望用戶能夠在這個系統中充分發揮想象,藉助此系統實現各種自定義的功能,覆蓋更多的運維場景

設計思路

Probius由三個關鍵詞構成:命令、模板、任務

命令:為系統中的最小粒度,可以是一個具體的linux命令,或者是一個腳本都可以

模板:模板為一組命令的集合

任務:模板為靜態的定義,而任務就是模板的執行,執行一個任務實際上就是去執行了一個模板內的所有命令

整體思想跟varian一樣,但不同的是可以僅僅通過web端的配置,就能實現各種各樣的功能,下邊具體介紹下如何配置的

頁面配置

新建命令,在這個頁面可以創建命令或者腳本

如果是單純的命令,直接在命令輸入框填寫即可,如果是需要執行腳本,則點擊腳本之後,會額外多出一個腳本輸入框,填寫要執行的腳本

理論上不限制腳本的類型,可以是shell、python或者go之類的,前提是系統上有腳本的運行環境,當命令或者腳本有參數的時候可以在參數列寫上參數名稱,然後在最終執行任務的時候需要傳遞具體參數的值過來

在命令執行完成后,會根據命令的返回狀態也就是$?的值來判斷命令是否執行成功,當$?為0是表示執行成功,否則表示執行失敗,如果是執行的腳本時,需要在腳本最後明確腳本返回狀態,shell腳本可以在腳本執行成功時通過exit指定退出狀態,例如

ls /ops-coffee.cn &&\
exit 0 ||\
exit 2

而對於python腳本則可以藉助sys.exit這樣寫

import sys

if 'www' in 'ops-coffee.cn':
  sys.exit(0)
else:
  sys.exit(3)

其他語言類似

模板的創建分為幾步,先創建一個模板

然後給模板添加任務

主要為選擇任務、確定執行順序、選擇執行主機以及執行用戶,添加完成后可以在模板詳情頁面看到關聯的命令

模板定義了一個完整的任務流程,定義完成后就可以執行任務了,執行任務界面寫的比較簡單

這界面主要給運維人員使用,定義任務名稱、所要執行的模板ID、以及參數,支持定時執行或者周期執行,只需要加上crontab參數即可,除了可以立即執行任務外,還可以將次任務保存為常用任務,後續在常用任務頁面可以直接執行

這個功能主要方便其他非運維人員使用本系統,同時也支持針對任務設置權限,可以將權限設置給某個用戶組,那麼則只有這個組內的成員可以看到並執行任務了

任務執行后可以通過任務歷史查看任務執行詳情,在這個頁面可以清晰的看到任務執行到了哪一步,是成功還是失敗

可以點擊每一步任務後邊的日誌查看實時日誌輸出

寫在最後

如果你用過我們開源的一站式DevOps平台CODO的話,會發現這個系統跟CODO的TASK模塊非常像,是的沒錯,這個設計與CODO的TASK如出一轍,但開源的CODO任務模塊要更加強大,例如支持分組執行、支持任務重做、支持人工干預等等

TASK的源碼在這裏:https://github.com/opendevops-cn/codo-task,感興趣可以自行閱讀部署,需要注意的是CODO為微服務架構,單獨安裝TASK是無法正常運行的,具體部署方法參考官方文檔

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化

研究:為挽救蜜蜂的二代農藥「速殺氟」會扼殺熊蜂後代

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心

碳儲存技術突破 吸收空氣中二氧化碳的菱鎂礦 數天內就可合成

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※想知道購買電動車哪裡補助最多?台中電動車補助資訊懶人包彙整

南投搬家公司費用,距離,噸數怎麼算?達人教你簡易估價知識!

※教你寫出一流的銷售文案?

※超省錢租車方案

碳交易收益回饋弱勢 加州的能源正義之路

環境資訊中心綜合外電;姜唯 編譯;林大利 審校

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※Google地圖已可更新顯示潭子電動車充電站設置地點!!

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※別再煩惱如何寫文案,掌握八大原則!

網頁設計最專業,超強功能平台可客製化

留住庇里牛斯原生熊最後血脈 法國將野放母熊

摘錄自2018年8月24日中央社報導

法國政府規劃在南部庇里牛斯山區野放兩頭母熊以促進繁衍。這片山區目前只有43頭熊,其中一頭保有當地原生熊的最後血脈,即使畜牧業者反對,政府對野放的態度也是勢在必行。

根據法國國家狩獵及野生動物局(ONCFS)於2017年調查,庇里牛斯山區還有43頭熊,但棲息地分布不平衡,主要集中在山區的中部和東部,西部只有兩頭公熊。

因此,生態部規劃今年秋天把兩頭生於斯洛維尼亞的母熊引入庇里牛斯山西北部的貝亞納(Bearn)地區,讓物種有機會繁衍。

庇里牛斯山-大西洋省(Pyrenees-Atlantiques)和奧克西塔尼大區(Occitanie)警署於今年6月到7月調查約6000名網路使用者的意願,結果顯示多達88.9%的受調者贊成引入兩頭熊到庇里牛斯山,只有8.9%不贊成。

在所有受調者中,約27%是庇里牛斯山區附近省份的居民,這些人有71.6%贊成野放,25.4%反對;若只看貝亞納地區的「當事人」意見,贊成比率降到58.6%,但仍超過半數。

法國政府今年5月公布2018年到2028年「庇里牛斯山區棕熊保育計畫」,未來10年內可能會執行不只一次的野放計畫,若牧羊人因熊攻擊而蒙受損失,也會予以賠償。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

網頁設計公司推薦不同的風格,搶佔消費者視覺第一線

※廣告預算用在刀口上,台北網頁設計公司幫您達到更多曝光效益

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

南投搬家公司費用需注意的眉眉角角,別等搬了再說!

※教你寫出一流的銷售文案?

麥加朝聖減環境足跡 綠色朝覲漸扎根

摘錄自2018年8月23日中央社報導

200多萬名穆斯林到麥加朝聖接近尾聲,數以千計清潔工忙著在垃圾當中將塑膠分離出來。這是全世界最大型的年度盛會之一,對沙烏地阿拉伯帶來巨大環保挑戰。

聖城麥加附近的米納(Mina)馬穆尼亞營地(Mamuniya)放置幾個不同顏色的大桶子:黑色桶收集有機垃圾,藍色桶回收鋁鐵罐和塑膠,這些都是為了減少朝覲環境足跡的作法。

麥加市公共衛生部門主管薩阿迪(Mohammed al-Saati)指出,到伊斯蘭教第一聖地朝聖期間,製造出的垃圾量超過4萬2000公噸。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※想知道最厲害的網頁設計公司"嚨底家"!

※別再煩惱如何寫文案,掌握八大原則!

※產品缺大量曝光嗎?你需要的是一流包裝設計!

南非東岸鑽探石油 「活化石」腔棘魚恐絕種

摘錄自2018年08月26日蘋果日報南非報導

「活化石」腔棘魚(Coelacanth)有絕種危機!腔棘魚是南非最瀕危的魚類,南非東海岸有約30條,牠也是世上最稀有魚類,但義大利能源公司Eni計劃在發現腔棘魚水域附近勘探開採石油,危及牠們的將來。

英國《衞報》報導,腔棘魚是史前魚類,科學家原以為腔棘魚早已絕種,直至1938年在南非東岸海域才被重新發現,並在50年代於非洲島國科摩羅(Comoros)再發現數條,才證實沒有絕種。2000年在南非發現一條細小腔棘魚,令科學家大為興奮。

不過義大利能源公司Eni最近計劃開展400公里長的勘探,而腔棘魚的棲息地在勘探邊界外約40公里,距離鑽探場地以北近200公里。

腔棘魚專家布魯頓(Mike Bruton)指魚類對環境干擾很敏感,「任何干擾牠們吸氧能力的外物,如油污均會威脅牠們的生存。勘探期間出現漏油或井噴的風險及以後的商業活動,令人嚴重關切」。

去年Eni曾作環境評估,但報告提及對腔棘魚影響的範圍很小,相反報告指在油井旁沒有太大可能發現腔棘魚,又指已經針對海洋生態和漏油的專家研究,發現兩者沒有相關威脅。

本站聲明:網站內容來源環境資訊中心https://e-info.org.tw/,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※別再煩惱如何寫文案,掌握八大原則!

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※超省錢租車方案

※教你寫出一流的銷售文案?

網頁設計最專業,超強功能平台可客製化