Mostrando entradas con la etiqueta ELK stack. Mostrar todas las entradas
Mostrando entradas con la etiqueta ELK stack. Mostrar todas las entradas

lunes, 7 de diciembre de 2020

ELK - Creando visualizaciones simples

Hace demasiado que no publicaba una nueva entrada sobre la pila ELK, la última entrada tiene más de un año, así que hoy vamos con un nuevo post en el que vamos a utilizar las capacidades de ingesta de Elasticsearch y, con los datos que tengamos, realizaremos una gráfica simple. 
 
En las entradas anteriores enviabamos datos a Logstash, para procesarlos y crear documentos con los campos necesarios para posteriormente almacenarlos en un índice de Elasticsearch. Además, también vimos como crear un índice y definíamos el tipo de dato específico de los campos que nos interesan.

Desde entonces ha llovido mucho y aunque continuamos con una configuración similar, vamos a introducir un pequeño cambio y usaremos un nodo de elasticsearch como nodo de ingesta, en el cual podremos definir un pipeline para procesar los datos que recibimos sin necesidad de usar Logstash. Básicamente, lo que estamos haciendo es pasar la funcionalidad de Logstash directamente al cluster de Elastic.
 
Para esto, vamos a empezar de forma sencilla analizando los mensajes que vamos a recibir desde un nodo que nos envía información mediante filebeat, la cual vamos a generar usando syslog_generator:
 
Mensajes recibidos desde filebeat.
 
Como vemos, filebeat envía una gran cantidad de campos que no nos interesan, ya que solo queremos quedarnos con el campo message que contiene el mensaje real creado por syslog_generator.
 
Vamos a empezar de forma simple creando un pipeline en Elastic que elimine todos esos campos que nos sobran. Para este tipo de tareas, como siempre, lo mejor es utilizar la consola de desarrollo de Kibana. Veamos directamente el pipeline para analizarlo posteriormente:
 
Pipeline básico de eliminación de campos.

Con este pipeline definimos que queremos eliminar una serie de campos de los mensajes recibidos, en concreto los que aparecen en el campo field dentro del procesador remove, que podemos ver en la imagen superior.

Una vez definido un pipeline, este queda almacenado en el cluster, pero para usarlo necesitamos asociarlo a un índice. Para seguir con nuestras pruebas, lo que hacemos es editar el índice que se ha creado automáticamente en el cluster en cuanto ha empezado a recibir datos de los nodos con filebeat y modificamos su atributo index.default_pipeline. Este atributo establece que pipeline hay que aplicar a los documentos recibidos antes de almacenarlos en el índice. Podemos editarlo desde la opción Edit Settings de la sección Index Management:

Modificación de las propiedades del índice.

Al especificar que debe aplicarse el pipeline a los documentos antes de almacenarlos, todos los campos que hemos indicado dentro del procesador remove del pipeline son eliminados, con lo que ahora podemos ver lo siguiente desde la sección Discover para el patrón del índice generado por filebeat:

Documentos en el índice filebeat tras pasar por el pipeline.

Como vemos, ahora cada documento se ha almacenado en el índice sin los campos que hemos eliminado anteriormente.

Para llegar a este punto hemos tenido que modificar manualmente el índice que se ha creado de forma automática pero, como esto no es muy operativo que se diga, empecemos por establecer en la configuración de filebeat que pipeline debe utilizarse estableciendo la opción pipeline en el fichero filebeat.yml de nuestro nodo filebeat origen:

Configuracion de filebeat aplicando el pipeline.

Por tanto, como vemos, podemos aplicar el pipeline a la salida configurada de filebeat en origen directamente. Al hacerlo comprobamos que todos los campos que hemos configurado en el procesador remove del pipeline remove_extra_fields, no se han almacenado en los documentos del índice filebeat.

Por tanto, ahora que tenemos claro que podemos pasar cierta funcionalidad de Logstash directamente a Elasticsearch, pasemos a hacer un pipeline que procese nuestro campo message adecuadamente, nos devuelva los campos que necesitamos, elimine el resto y para finalizar, escriba los documentos en un índice diferente, donde estableceremos los mapeos necesarios para cada campo, asegurándonos que el tipo de dato de cada uno se almacena de forma correcta.

El pìpeline que realiza toda la manipulación de campos que necesitamos es el siguiente:

Pipeline de eliminación de campos y procesado del campo message.

Como en la imagen anterior no queda muy claro, podéis encontrar el pipeline completo, junto con la creación del índice definitivo, anexado con este post.

Ya hemos comprobado que este pipeline modifica el documento, eliminando aquellos campos que no necesitamos y crea los nuevos campos a partir del procesado del campo message original. A continuación crearemos un índice especificando los campos que necesitamos en cada documento y, lo más importante, con el tipo de dato correcto. Ya sabemos que, para crear un índice, lo mejor es usar la consola de desarrollo de Kibana. Podemos crear el índice de una manera similar a la siguiente:

Creación del índice.

Una vez creado nuestro nuevo índice, creamos el index pattern necesario para que Kibana pueda obtener datos de Elasticsearch. Para esto, desde el menú Management, en la sección Index Patterns de Kibana, creamos el nuevo patrón especificando que el campo @timestamp es el que contiene la información de fecha y hora para poder hacer el filtrado por tiempo:

 

Creación del index pattern correspondiente al nuevo índice.

Seleccion del campo de fecha del index pattern.

Es importante que nos demos cuenta que el campo @timestamp de cada documento contiene la fecha y hora de recepción de la información enviada por filebeat, la cual no es la misma que aparece en el campo message. Para corregir esto, el pipeline hace una conversión del campo EVENT_TIMESTAMP que hemos construido con el procesador dissect y copia dicha información en el campo @timestamp. Esta operación de conversión la realiza el siguiente procesador:

Modificación del campo @timestamp.

A continuación cambiamos la configuración de filebeat en el nodo origen para especificar el nombre del pipeline que deseamos usar antes de almacenar los documentos en el índice:

Configuración definitiva de filebeat.

Con toda la configuración ya realizada, podemos comprobar como tenemos datos en el índice en los campos deseados y con el tipo de dato correcto:

Datos del indice temperature_sensors.

Ya con nuestros datos, podemos pasar a hacer una representación gráfica simple de los valores de temperatura recibidos desde la sección Visualize. Podemos crear una gráfica de tipo lineal para ver los valores medios de temperatura:

Valores medios de temperatura.

Esta representación nos muestra los valores medios de temperatura para cada intervalo temporal que seleccionemos. Para esto establecemos que en el eje Y queremos el valor medio del campo TEMPERATURE_VALUE de cada documento y, en el eje X establecemos un histograma basado en el campo de fecha y hora @timestamp.

El problema de esta visualización, es que estamos obteniendo el valor medio de los valores de temperatura enviados por tres sensores diferentes. Para mostrar en la misma gráfica el valor medio de temperatura, por cada uno de los sensores que estamos simulando con syslog_generator, tenemos que añadir filtros en el eje X para los identificadores de cada uno de los sensores. Esto podemos hacerlo más o menos del siguiente modo:

Filtro por cada sensor.

Al realizar esta configuración, pasamos a tener una gráfica como la siguiente:

Valor medio de temperatura por cada sensor.

Es importante tener en cuenta que cada uno de los valores que vemos en cada gráfica, se corresponden con el valor medio de todos los valores recibidos en ese intervalo de tiempo. Si aumentamos la resolución, disminuyendo el tiempo de representación de la gráfica, podemos ver algo como lo siguiente:

Valores de temperatura por segundo de cada sensor.

En esta gráfica ya tenemos una resolución de un segundo, correspondiente a la tasa de envío de información que he configurado en syslog_generator para simular la información de temperatura de cada sensor simulado.

Por tanto y en resumen, hemos visto como crear un pipeline que nos permite procesar los documentos que llegan a Elasticsearch, sin necesidad de utilizar Logstash, hacer las modificaciones necesarias para obtener los campos que nos interesan y almacenar dichos documentos en un índice diferente.

Además, basándonos en dichos datos, hemos creado una gráfica simple en la que hemos podido aplicar filtros para diferenciar entre diferentes fuentes del mismo índice.

En el siguiente enlace podéis encontrar el fichero que contiene la definición del pipeline así como la del índice utilizados a lo largo del post.

sábado, 9 de noviembre de 2019

ELK - Creación de índices y mapeo de campos.

Hoy, tras las entradas anteriores, continuamos trabajando con Elastic para revisar un punto muy importante sobre los campos y el tipo de dato de los mismos. Veremos que en algunos casos será necesario fijar el tipo de dato de un campo, y como podemos reindexar los datos ya disponibles en caso de ser necesario.

En todas las entradas sobre Logstash hasta la fecha hemos enviado los registros de log, generados por syslog_generator, directamente a Logstash donde hemos hecho el mapeo automático de los campos de origen.

Fijándonos en los campos que enviamos mediante syslog_generator, vemos un punto muy interesante sobre los campos de fecha que hemos mapeado al analizar el índice. Podemos consultar el mapeo de campos directamente desde la sección Mapping del índice en cuestión, que podemos encontrar en Index Management:

Mapeo por defecto de campos.
Como podemos ver, los dos campos de hora y fecha que hemos definido procedentes de nuestros hosts son de tipo texto, no de tipo fecha como sería lo correcto. Además, como recordaremos de un post anterior, al crear el patrón sobre el índice que contiene los documentos de Logstash, vimos que usábamos por defecto el campo @timestamp que contiene la fecha y hora de recepción del documento y su introducción en el índice, no la fecha y hora real de generación del evento. Por tanto, ¿que tenemos que hacer para asegurarnos que un campo se mapea con el tipo de dato correcto? y más importante aún ¿puedo cambiar el tipo de dato de un documento ya existente en un índice?

Revisando el resto de campos del índice podemos ver que todos los campos se han mapeado como una cadena, por ejemplo tempsensor_temperature que debería mapearse como un tipo integer. 

Por tanto, salvo que los campos de nuestros documentos sean siempre de tipo texto, está claro que es necesario crear nuestros índices previamente, analizando la información que queremos procesar con Elastic para establecer los mapeos de los campos correctamente y fijando el tipo de dato de los mismos cuando sea necesario.

En un caso como este, en el cual ya tenemos un índice cuyos documentos necesitamos mantener, vamos a tener que realizar una operación de reindexación, en la cual básicamente copiamos el contenido de un índice en otro índice diferente.

Empecemos creando nuestro nuevo índice con los campos mapeados a un valor correcto, para esto lo más sencillo es que usemos directamente la consola interactiva disponible en el menú Dev Tools:

Consola de desarrollo de Elasticsearch.
Comencemos creando un índice nuevo con las mísmas características del índice existente que necesitamos reindexar. Para esto lo más sencillo es que copiemos y peguemos las características del índice ya existente, las cuales podemos obtener utilizando la operación _search con un comando GET NombreDelIndex desde la consola:

Configuración del índice existente.
Copiamos la salida del comando anterior y la pegamos en la consola, cambiando el tipo de dato de los mapeos de los campos que necesitamos así como el nombre del índice. Es importante que definamos el formato de mapeo de los campos de manera correcta o, como veremos, recibiremos un error al realizar la operación de reindexado. Para empezar establecemos el siguiente formato para nuestros campos de fecha y hora:

Especificación de formato para el campo tempsensor_time.

Especificación de formato para el campo tempsensor_timestamp.

El formato de fecha especificado para el campo tempsensor_timestamp es un formato de fecha personalizado, construido a partir de la información dada por la clase DateTimeFormatter de Java, cuya documentación podéis consultar en el siguiente enlace.

Para el campo tempsensor_timestamsp es tan sencillo como definir el campo en el nuevo índice como tipo integer.

Especificación del campo tempsensor_temperature.

Por último cambiamos el tipo de operación a PUT y, si hemos realizado la configuración correctamente, el resultado será más o menos el siguiente:

Resultado de la creación del nuevo índice.
Ahora nuestro nuevo índice aparece en la sección Index Management siendo el número de documentos disponibles cero:

Nuevo índice ya disponible para su uso.
A continuación reindexaremos los documentos existentes en nuestro índice actual, para lo cual usaremos la operación reindex del API de Elasticsearch. Desde la consola de desarrollo lanzamos la operación de reindexado del siguiente modo:

Operación de reindexado.
En caso de utilizar un formato incorrecto, o en nuestro caso al especificar un formato de fecha incorrecto, recibiríamos el siguiente mensaje:

Error de la operación dereindexado debido a un formato incrrecto.
Para utilizar un formato de fecha correcto, os recomiendo consultar la documentación de Elasticsearch en este enlace que nos explica el tipo de dato date, así como este otro enlace que muestra los tipos de formatos de fecha predefinidos que podemos utilizar. Adicionalmente podremos crear un formato de fecha para el tipo date siguiendo la sintaxis dada por la clase DateTimeFormatter de Java.

Tras realizar la operación de reindexado y al crear el index pattern para nuestro nuevo índice, ya vemos la primera diferencia con el índice anterior. Durante la creación del index pattern para Kibana, ya tenemos disponibles nuestros campos de fecha y hora para ser usados como filtros de tiempo para mostrar y analizar los datos:

Selección del filtro de tiempo durante la creación del index pattern.
 
Al terminar la creación del index pattern ya podemos consultar los datos correspondientes a nuestro nuevo índice, obteniendo el siguiente resultado:

Documentos del índice reindexado.
Como podemos ver, Kibana ha completado los nuevos campos con la fecha, en el caso del campo tempsensor_time y con la hora para el campo tempsensor_timestamp.

Empecemos por corregir los campos que contienen la hora y fecha de cada documento. Para evitar este comportamiento solo necesitamos editar cada uno de estos campos desde el menú Index Patterns, dentro de la sección Management, para eliminar la fecha en el caso del campo tempsensor_time y la hora, en el caso del campo tempsensor_timestamp. Podemos verlo en las siguientes imágenes:

Búsqueda de campos en el index pattern de Kibana.
Al pinchar sobre la opción Edit de cada uno de los campos de tipo date, podremos cambiar como se muestran y, por tanto, eliminar la fecha en el caso del campo tempsensor_time:

Corregimos el formato de represenatción del campo tempsensor_time.
Y eliminar la hora en el caso del campo tempsensor_timestamp:

Corregimos el formato de represenatción del campo tempsensor_timestamp.
Con lo que ahora, al volver a la sección Discover ya vemos los campos de fecha y hora correctamente, además de que el campo tempsensor_temperature ya se reconoce de tipo numérico:

Datos del nuevo índice con los campos ya corregidos.
Como hemos visto, Elastic nos proporciona herramientas para manejar los documentos de índices ya existentes. Esto nos permite corregir errores en los mapeos de los campos y cambiarlos de tipo, cuando sea necesario. Evidentemente esto puede ser bastante problemático si tenemos índices con millones de documentos, con lo que es muy importante que estudiemos detenidamente los datos que vamos a enviar antes de crear el índice para evitar tener que realizar este tipo de operaciones.


sábado, 6 de julio de 2019

ELK - Introducción a Logstash IV

Tras las entradas anteriores, vamos a crear un pipeline en el que procesaremos dos entradas diferentes procedentes de dos hosts distintos, generadas ambas con syslog_generator, y que enviaremos a Elasticsearch.

La definición de pipelines en Logstash se establece en el fichero pipelines.yml, el cual por defecto tiene la siguiente configuración:

Fichero de configuración pipelines.yml.
Como podemos ver, por defecto hay definido un pipeline denominado main y cuya configuración estará en la ruta indicada en el fichero dado por la opción path.config.

En este fichero podremos indicar opciones de configuración para cada pipeline, así como la ruta en la que se encontrarán los ficheros de configuración de cada uno de ellos. Hay múltiples opciones de configuración disponibles que podremos establecer para cada pipeline de forma individual, siendo algunas de las más interesantes:
  • pipeline.workers. La configuración por defecto de Logstash está optimizada para un único pipeline, con lo que habrá competición por los recursos del sistema si configuramos múltiples pipelines. Cada una de ellas usará un worker por cada un de los cores de CPU disponibles, siendo por tanto recomendable reducir el número de workers de cada pipeline y ajustar el número en función del rendimiento del sistema y CPUs disponibles.
  • queue.type. Logstash utiliza colas basadas en memoria para almacenar los eventos entre las diferentes fases de un pipeline. Como es lógico, en caso de fallo, los datos que se encuentren en memoria se perderán. Para evitar esto, se puede configurar el tipo de cola como persisted para almacenar los eventos en disco, aunque es importante resaltar que determinados plugins de entrada no pueden ser protegidos contra la pérdida de datos, independientemente del tipo de cola usado.
Para este ejemplo vamos a crear dos pipelines diferentes, uno procesará datos de tipo FIXED y otro datos de tipo TEMP, ambos generados por syslog_generator. La estructura de cada uno de estos eventos será:
Tipo FIXED.
Tipo TEMP.
Con todo esto, pasamos a establecer la siguiente configuación de pipelines en Logstash:

Configuración de múltiples pipelines.
Siendo la configuración de cada pipeline la siguiente:

Configuración de pipeline para eventos de tipo TEMP.
Configuración de pipeline para eventos de tipo FIXED.
Como vemos en los ficheros de configuración anteriores, la diferencia principal con lo que habíamos hecho hasta ahora es la sección output de ambos pipelines. En ambos establecemos que el plugin de salida es elasticsearch, que nos permitirá enviar la salida de cada piepline a elasticsearch. Hay varias opciones de configuración disponibles para este plugin, pero de momento solo usaremos la opción que nos permite especificar el host de destino.

También conviene destacar que cada pipeline levanta un listener de entrada en el host Logstah, con lo que es necesario cambiar los puertos en el plugin de entrada si necesitamos usarlo para diferentes orígenes.

Con esta configuración establecida, cuando arrancamos syslog_generator en los nodos fuente, estos comenzarán a enviar datos a Logstash, cada pipeline aplicará el filtro correspondiente y enviará los datos procesados a Elasticsearch.

Al estar usando la configuración por defecto, veremos que se crea un índice identificado por la cadena logstash-YYYY.MM.DD, lo cual podemos comprobar desde Kibana, en la sección Index Management:

Lista de índices disponibles en Elasticsearch.

Para poder trabajar con los documentos de estos índices, solo tenemos que establecer un patrón desde la sección Index Patterns de Kibana, lo que nos permitirá comenzar a trabajar con los datos disponibles. En este caso, la forma más sencilla de crear el patrón sería:

Creación de un index pattern para los datos procesados en Logstash.
Como vemos, al establecer un patrón que coincide con algún índice existente, estos se muestran para que podamos elegir el índice deseado, o bien aplicarlo a todos aquellos que cumplan con el patrón. En el siguiente paso de configuración, Kibana nos preguntará que campo queremos emplear como registro de tiempo para cada evento y seleccionaremos el que nos ofrece por defecto, identificado como @timestamp.
  
Tras la configuración, podremos comprobar cuales son los campos disponibles en la siguiente pantalla:

Lista de todos los campos disponibles en el índice logstash.
Con esta configuración realizada, desde la sección Discover podremos empezar a trabajar con los datos recibidos desde los hosts. Por ejemplo, podemos filtrar la información disponible de todos los documentos y seleccionar los campos tempsensor_temperature y tempsensor_time de entre todos los campos disponibles en los documentos del índice. Para eso solo es necesario que seleccionemos que campos queremos mostrar dentro del apartado Selected fields:

Búsqueda de campos en Kibana.
De este modo, Kibana nos mostrará solamente los campos que hemos seleccionado de todos los documentos del índice en el período de tiempo específicado.

También desde aquí podemos realizar una búsqueda usando el lenguaje de queries de Kibana, para esto solo tenemos que especificar el campo o campos que queremos buscar. Por ejemplo, podemos especificar que busque aquellos documentos en los que existan los campos tempsensor_temperature y tempsensor_time, siendo el resultado el siguiente:

Búsqueda de documentos usando una query.
Al hacer esta búsqueda, ahora solo tendremos aquellos documentos que cumplan la condición de la query que hemos especificado. Del mismo modo, si queremos que nos muestre aquellos documentos donde el campo tempsensor_temperature se encuentre entre dos valores, podemos hacer una query como la siguiente:

Búsqueda de documentos usando una query simple.
Como podemos ver, Kibana solo muestra los documentos que cumplen con la condición establecida en nuestra query.

En resumen, hemos podido ver como configurar Logstash para enviar datos a Elasticsearch y crear un índice en Kibana para poder realizar búsquedas simples en nuestros documentos de prueba.

En las próximas entradas veremos como podemos realizar gráficas, la importancia del mapeo de los campos de nuestros documentos a la hora de crear los índices y como mejorar la configuración de los pipelines.

sábado, 18 de mayo de 2019

ELK - Introducción a Logstah III

En el post anterior sobre Logstash, vimos que los dos filtros más utilizados para registros de tipo texto son dissect y grok. Usando syslog_generator, el cual os recuerdo que tenéis disponible aquí por si queréis usarlo, creamos entradas de tipo fijo para el syslog de una máquina donde habíamos instalado filebeat, las enviamos a Logstash y las procesamos con un filtro dissect para obtener los campos que nos interesaban.

En esta entrada usaremos grok sobre el mismo tipo de registros, enviados también desde un sistema con filebeat, para poder realizar la misma tarea.

Primero es importante recordar que grok es perfecto para procesar registros de tipo texto no estructurados, es decir, para aquellos casos en los que se generan registros con una estructura variable de línea a línea. Además, al basarse en expresiones regulares, es posible extender los patrones reconocidos creando patrones para poder usarlos en cualquier pipeline de Logstash que utilice grok.

La sintaxis básica para definir un filtro con grok se define del siguiente modo:
Sintaxis básica para un filtro grok.
Y esta definición, ¿que quiere decir exactamente? Como ya hemos visto, a diferencia de dissect, grok se basa en expresiones regulares ya existentes, identificadas con un nombre que se corresponde con el campo PATTERN, las cuales asignaremos al campo NAME que necesitemos cuando la coincidencia sea satisfactoria. Grok incluye un buen número de patrones ya listos para su uso, cuya lista puede consultarse aquí aunque pueden incluirse nuevos patrones, si es necesario, modificando la configuración de Logstash añadiendo un fichero con la definición de nuevos patrones.

Al definir filtros usando grok, este busca coincidencias desde el principio de cada línea de tecto hasta el final de la misma intentando mapear todo el evento, o hasta no encontrar una coincidencia. En función de los patrones, es posible que grok procese los datos varias veces, por lo que es más lento y requiere más recursos que filtros realizados con dissect.

Para nuestro ejemplo, vamos a seguir con líneas de registro que son perfectas para el uso de dissect, ya que no varían y presentan los mismos campos y delimitadores en todos los casos, pero así podremos comprobar que podemos usar indistintamente cualquiera de los dos tipos de filtro.

Las líneas a procesar presentan la siguiente estructura:
Líneas de registro a procesar y definición de campos.
Al igual que hicimos con el filtro dissect, vamos a empezar con un filtro simple y luego iremos complicándonos un poco más. 

Para empezar, vamos a hacer uso de una herramienta muy útil disponible en Kibana y que nos permite probar nuestros filtros grok, con lo que podemos analizar si el resultado obtenido es o no correcto antes de configurarlo en nuestro pipeline. Podemos acceder al Grok Debugger desde la sección Dev Tools de Kibana:

Grok Debugger disponible en Kibana.
Esta herramienta es de gran ayuda ya que, al tratarse de expresiones regulares, es fácil que tengamos que probar bastantes veces antes de dar con la configuración correcta del filtro. Por tanto, para empezar, vamos a probar a quedarnos solo con 4 campos de la línea message, en concreto con la fecha de syslog, el hostname, el nombre del programa que ha generado la línea y el mensaje generado por syslog_generator. Para esto, el filtro grok a aplicar sobre la línea message será:

Filtro grok inicial.
Como vemos en el filtro, puede que en algunos casos sea necesario definir los espacios que existan en el registro de texto, dados por el patrón SPACE. Teniendo esto en cuenta, el filtro está estableciendo lo siguiente:
  • Almacenar en el campo logger_date el texto encontrado que haga match con el patrón SYSLOGTIMESTAMP.
  • Almacenar en el campo logger_hostname la siguiente palabra encontrada.
  • Almacenar en el campo logger_program los siguientes datos encontrados.
  • Almacenar en el campo logger_message todo lo que queda hasta el final de la línea, lo cual especifcamos con el patrón GREEDYDATA.
Usando el grok debugger con este filtro, sobre una línea de ejemplo recibida en Logstash, podemos simular el resultado y comprobar que, al trabajar con expresiones regulares, los resultados que buscamos son un poco más complicados de obtener de lo que esperábamos:

Usando Grok Debugger con el filtro inicial.
Como vemos en la imagen anterior, el campo logger_program se queda vacio, es decir, que la definición %{DATA:logger_program} que hemos hecho para ese campo, no nos devuelve ningún contenido. Si vemos la definición del patrón DATA disponible, vemos que equivale a .*? lo cual establece que debe devolver la menor cantidad posible de caracteres (?) de cualquiera de los caracteres (.*) anteriores. Básicamente, al especificar que se devuelva la menor cantidad posible de caracteres con ?, no está devolviendo ningún valor en dicho campo.

La forma de corregirlo sería usar un filtro como el siguiente, en el cual fijásemos que haga match con cualquier palabra y todos aquellos caracteres que no sean un espacio:

Filtro grok modificado para incluir de forma correcta el cmapo logger_program.
Al probarlo en el depurador de grok de Kibana, vemos que ahora obtenemos los campos que queremos de forma correcta:

Comprobación de que el nuevo filtro es correcto.
La sintaxis empleada es la que podemos usar cuando la expresión regular que necesitamos, no se encuentra en los patrones disponibles incluidos con el plugin de filtro grok de Logstash. Podemos usarla directamente en la definición del filtro o, si queremos usarla de manera habitual, podemos extender la biblioteca de patterns incluyendo las nuestras y asignándoles un nombre para poder usarlas en múltiples pipelines.

Modificamos el fichero de configuración del filtro grok con esta definición de filtro y arrancamos Logstash. La salida que obtenemos en este caso es la siguiente:

Salida de Logstash con el filtro grok correcto.
Podemos ver como los cuatro campos que hemos definido están en la salida, además del resto generados por el plugin de entrada filebeat. Además, el campo message del registro ya no aparece al eliminarlo con la opción remove_field.

Ahora ya solo nos queda procesar el campo logger_message para obtener el resto de campos que necesitamos, para lo cual podríamos aplicar un filtro como el siguiente:

Filtro grok procesando todos los campos de message.

Cuando lo aplicamos, la salida de Logstash ya nos muestra los campos necesarios como podemos ver en la siguiente imagen:

Salida de Logstash con el filtro final grok.
En resumen, está claro que grok nos da mucha más potencia para procesar registros de tipo texto, pero con un coste de dificultad añadido debido al uso de expresiones regulares. En general, el uso de dissect o grok, dependerá de la estructura de los registros de texto que sea necesario procesar y, en algunos casos, será conveniente combinar ambos en el mismo pipeline.

En próximas entradas crearemos más de un pipeline, para ver como trabajan conjuntamente y enviaremos los datos a Elasticsearch para podeer realizar búsquedas sobre los datos recibidos.

Como referencia para el estudio de expresiones regulares, recomiendo usar la página https://regex101.com/ en la cual, además de poder probar expresiones regulares, nos explica cada una de las opciones y operadores que podemos usar para la construcción de expresiones regulares complejas.