18 Temmuz 2023 Salı

Logstash

Giriş
Açıklaması şöyle
What Is Logstash?
Logstash is an open-source data processing pipeline from Elastic. It is being used to ingest, transform, and ship data to different sources, including Elasticsearch, Kafka, flat files, etc.

Logstash pipeline includes three different processes:

  1. Input: It is the data source from which the data is collected for ingestion.
  2. Filter: It transforms (cleans up, aggregates, etc.) the data using plugins like Grok, Mutate, Date, etc.
  3. Output: Destination for ingestion (Elasticsearch, flat files, db, etc.).
logstash.conf Dosyası
Örnek - postgres to elastic
Açıklaması şöyle
Below are the prerequisites to send data using Logstash to Elastic:

1. Logstash is installed on the system with a JDBC driver for Postgres.
2. Postgres database with a table or function to sync.
3. Elasticsearch instance is running.
Şöyle yaparız. Burada incremental ingestion yapılıyor. İş schedule alanında belirtildiği gibi dakikada bir çalışıyor. tracking_column değeri "updated_at" ve tipi timestamp. Böylece sayfalayarak elastic'e veri aktarılıyor
input {
    jdbc {
        jdbc_driver_library => "c:/logstash/jdbc/postgresql.jar"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_connection_string => "${JDBC_HOST}"
        jdbc_user => "${DB_USER}"
        jdbc_password => "${DB_PWD}"
        jdbc_paging_enabled => true
        jdbc_page_size => 1000
        schedule => "* * * * *"  # schedule to run every minute
        statement => "SELECT * FROM employee WHERE updated_at > :sql_last_value"
        use_column_value => true
        tracking_column => "updated_at"
        tracking_column_type => "timestamp"
        last_run_metadata_path => "c:/logstash/employee.tracker"
    }
}

filter {
}
    mutate {
        remove_field => ["date", "@timestamp", "host"]
    }

    # Example of parsing JSON fields if needed
    json {
         source => "first_name"
         target => "name"
    }
}

output {
    stdout { codec => json_lines }
    elasticsearch {
        hosts => ["http://localhost:9200"]
        index => "my_table_index"
        custom_headers => {
                "Authorization" => "${AUTH_KEY}"
            }
        document_id => "%{table_id}" # Unique identifier from the table
        timeout => 120
    }
}
Örnek - filebeat'ten okur
Şöyle yaparız
input {
  beats {
    port => "4561"
    host => "0.0.0.0"
  }
}

filter { }

output {
  elasticsearch {
    hosts => ["0.0.0.0:9200"]
    index => "app-logs-%{+YYYY.MM.dd}"
  }
}
Örnek - stdout + elasticsearch 
Şöyle yaparız
# Sample Logstash configuration for creating a simple
# Beats -> Logstash -> Elasticsearch pipeline.

input {
  file {
    path => "<Log File Full Path>"
    start_position => "beginning"
  }
}

output {
 stdout {
    codec => rubydebug
  }
  elasticsearch {
    hosts => ["https://localhost:9200"]
    ssl_certificate_verification => false
    ssl => true
    index => "elkdemoindex"
    user => "elastic"
    password => "<Elastic Search Password>"
  }
}
Örnek - TCP'den okur
Şöyle yaparız
input {
    tcp {
        port => 4560
        codec => json_lines
    }
}

filter { }

output {
    
    elasticsearch {
        hosts  => ["http://elasticsearch:9200"]
        index  => "operationlog-%{app_name}-%{+YYYY-MM-dd}"
    }
    
}



Hiç yorum yok:

Yorum Gönder