problem description
when I use the logstash-input-jdbc plug-in Synchronize MySQL data to ES, the SQL query meets 13
but only 12 when I save to ES. I added a data entry to the table Synchronize is the last unsaved data, and when I exit from the terminal (Ctrl + c), it will first write the last record to ES and then exit the process
log of execution output:
generate a log when I Ctrl + c exits logstash
[2018-10-31T11:40:07,422][WARN ][logstash.agent ] stopping pipeline {:id=>"main"}
{"name":"Kimi","id":14,"todos":[],"todo_list":[],"tags":["_aggregatefinalflush"]}
this log shows that logstash helped me write the unsaved record to ES before exiting, but there is an extra tags field on the data marked as _ aggregatefinalflush
platform version in which the problem occurs
ElasticSearch version-> 5.5.2
Logstash version-> 5.5.2
develop and test platform MacOS Mojave 10.14
related codes
logstash configuration:
input {
stdin{
}
jdbc {
-sharp Mysql
jdbc_connection_string => "jdbc:mysql://localhost:3306/test"
-sharp
jdbc_validate_connection => true
-sharp
jdbc_user => "root"
-sharp
jdbc_password => "123456"
-sharp JDBC
jdbc_driver_library => "/Users/simon/logstash/mysql-connector-java-5.1.36.jar"
-sharp JDBC
jdbc_driver_class => "com.mysql.jdbc.Driver"
-sharp SQL SQL statement_filepath
-sharp statement => "select * from users"
-sharp SQL SQL
statement_filepath => "/Users/simon/logstash/mysql_users.sql"
-sharp CronJob Crontab
-sharp *
schedule => "* * * * *"
-sharp sql jdbc_page_size
-sharp jdbc_paging_enabled => true
-sharp jdbc_paging_enabled = true 100000
-sharp jdbc_page_size => 1
-sharp ElasticSearch Domcument type ES 6.x output document_type 7.x
-sharptype => "users"
-sharp , , tracking_column , last_run_metadata_path
record_last_run => "true"
last_run_metadata_path => "/Users/simon/logstash/sync_last_id"
-sharp last_run_metadata_path ,
-sharp clean_run => "false"
-sharp column ,record_last_run, track column true. track timestamp
use_column_value => true
-sharp use_column_value ,. track column , column . mysql
tracking_column => "id"
-sharp (column)
-sharplowercase_column_names => "false"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "
-sharp
map["id"] = event.get("id")
map["name"] = event.get("name")
map["todo_list"] ||=[]
map["todos"] ||=[]
if (event.get("todo_id") != nil)
if !(map["todo_list"].include? event.get("todo_id"))
map["todo_list"] << event.get("todo_id")
map["todos"] << {
"todo_id" => event.get("todo_id"),
"title" => event.get("text"),
}
end
end
event.cancel()
"
push_previous_map_as_event => true
}
json {
source => "message"
remove_field => ["message"]
-sharpremove_field => ["message", "type", "@timestamp", "@version"]
}
mutate {
-sharpJSON ES
remove_field => ["@timestamp", "@version"]
}
}
-sharp MySQL ElasticSearch
output {
elasticsearch {
-sharp ES URL
hosts => ["127.0.0.1:9200"]
-sharp ES
index => "mysql_users"
-sharp document_type ES 6.x output document_type 7.x
document_type => "users"
-sharp ID ID
document_id => "%{id}"
codec => "json"
}
stdout {
codec => json_lines
}
}
SQL mysql_users.sql content executed:
SELECT
`users`.`id` AS `id`,
`users`.`name` AS `name`,
`todo`.`id` AS `todo_id`,
IFNULL(`todo`.`text`, "") AS `text`,
IFNULL(`todo`.`is_done`, 0) AS `is_done`,
`todo`.`user_id` AS `user_id`
FROM `users`
LEFT JOIN `todo` ON `users`.`id` = `todo`.`user_id`
WHERE `users`.`id` > :sql_last_value
ORDER BY `id` ASC
Database table creation script and test data:
DROP TABLE IF EXISTS `todo`;
CREATE TABLE `todo` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`text` varchar(255) NOT NULL DEFAULT "" COMMENT "",
`is_done` tinyint(3) DEFAULT "0" COMMENT "",
`user_id` int(11) DEFAULT "0",
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
LOCK TABLES `todo` WRITE;
/*!40000 ALTER TABLE `todo` DISABLE KEYS */;
INSERT INTO `todo` (`id`, `text`, `is_done`, `user_id`)
VALUES
(3,"bbbbb",0,1),
(4,"cccccc",0,1),
(5,"",0,2),
(6,"Vue",0,2),
(7,"Hello world",0,11),
(8,"",0,11),
(10,"",0,1),
(11,"",0,1),
(12,"",0,9),
(13,"",0,9);
/*!40000 ALTER TABLE `todo` ENABLE KEYS */;
UNLOCK TABLES;
DROP TABLE IF EXISTS `users`;
CREATE TABLE `users` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT "",
`version` int(11) DEFAULT "0",
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
LOCK TABLES `users` WRITE;
/*!40000 ALTER TABLE `users` DISABLE KEYS */;
INSERT INTO `users` (`id`, `name`, `version`)
VALUES
(1,"Simon",0),
(2,"Jerry",0),
(4,"Jim",0),
(5,"Mary",0),
(6,"Amy",0),
(7,"Kaiven",0),
(8,"Bell",0),
(9,"Sky",0),
(10,"Sam",0),
(11,"Lily",0),
(12,"Lucy",0),
(13,"David",0),
(14,"Kimi",0);
/*!40000 ALTER TABLE `users` ENABLE KEYS */;
UNLOCK TABLES;
what result do you expect? What is the error message actually seen?
I hope the last piece of data can also try Synchronize to enter ES instead of waiting for the end of the process to write
.normal during execution, with no error prompts and warnings