problem description
when importing mysql one-to-many structured data into ElasticSearch with logstash-input-jdbc, there is a phenomenon that the data queried by connected tables are not correctly stored in the array, and it is occasionally normal.
the platform version of the problem and what methods you have tried
system environment: MacOS Mojave 10.14
Mysql version: 5.6.23
ElasticSearch version: 5.5.2
logstash version: 6.4.2
related codes
logstash.conf configuration
input {
stdin{
}
jdbc {
-sharp Mysql
jdbc_connection_string => "jdbc:mysql://localhost:3306/yii2basic"
-sharp
jdbc_validate_connection => true
-sharp
jdbc_user => "root"
-sharp
jdbc_password => "******"
-sharp JDBC
jdbc_driver_library => "/Users/simon/logstash/mysql-connector-java-5.1.46.jar"
-sharp JDBC
jdbc_driver_class => "com.mysql.jdbc.Driver"
-sharp SQL SQL statement_filepath
-sharp statement => "select * from users"
-sharp SQL SQL
statement_filepath => "/Users/simon/logstash/mysql_users.sql"
-sharp CronJob Crontab
-sharp *
schedule => "* * * * *"
-sharp ElasticSearch Domcument type ES 6.x output document_type 7.x
-sharptype => "users"
-sharp , , tracking_column , last_run_metadata_path
record_last_run => "true"
last_run_metadata_path => "/Users/simon/logstash/sync_last_id"
-sharp last_run_metadata_path ,
-sharp clean_run => "false"
-sharp column ,record_last_run, track column true. track timestamp
use_column_value => true
-sharp use_column_value ,. track column , column . mysql
tracking_column => "id"
-sharp (column)
-sharplowercase_column_names => "false"
}
}
filter {
aggregate {
task_id => "%{id}"
code => "
-sharp
map["id"] = event.get("id")
map["name"] = event.get("name")
map["todo_list"] ||=[]
map["todos"] ||=[]
if (event.get("todo_id") != nil)
if !(map["todo_list"].include? event.get("todo_id"))
map["todo_list"] << event.get("todo_id")
map["todos"] << {
"todo_id" => event.get("todo_id"),
"title" => event.get("text"),
}
end
end
event.cancel()
"
push_previous_map_as_event => true
}
json {
source => "message"
remove_field => ["message"]
-sharpremove_field => ["message", "type", "@timestamp", "@version"]
}
mutate {
-sharpJSON ES
remove_field => ["tags", "@timestamp", "@version"]
}
}
-sharp MySQL ElasticSearch
output {
elasticsearch {
-sharp ES URL
hosts => "127.0.0.1:9200"
-sharp ES
index => "mysql_users"
-sharp document_type ES 6.x output document_type 7.x
document_type => "users"
-sharp ID ID
document_id => "%{id}"
}
stdout {
-sharpcodec => json_lines
}
}
mysql_users.sql
SELECT
`users`.`id` AS `id`,
`users`.`name` AS `name`,
`todo`.`id` AS `todo_id`,
IFNULL(`todo`.`text`, "") AS `text`,
IFNULL(`todo`.`is_done`, 0) AS `is_done`,
`todo`.`user_id` AS `user_id`
FROM `users`
LEFT OUTER JOIN `todo` ON `todo`.`user_id` = `users`.`id`
WHERE `todo`.id > 0
-- WHERE `users`.`id` > :sql_last_value
ORDER BY `id`
results of query from database
content after importing ElasticSearch
GET mysql_users/users/_search
{
"size": 200,
"sort" : [
{"id" : "desc"}
]
}
:
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": null,
"hits": [
{
"_index": "mysql_users",
"_type": "users",
"_id": "11",
"_score": null,
"_source": {
"id": 11,
"name": "Lily",
"todos": [
{
"todo_id": 8,
"title": ""
}
],
"todo_list": [
8
]
},
"sort": [
11
]
},
{
"_index": "mysql_users",
"_type": "users",
"_id": "2",
"_score": null,
"_source": {
"id": 2,
"name": "Jerry",
"todos": [
{
"todo_id": 5,
"title": "dddddd"
}
],
"todo_list": [
5
]
},
"sort": [
2
]
},
{
"_index": "mysql_users",
"_type": "users",
"_id": "1",
"_score": null,
"_source": {
"id": 1,
"name": "Simon",
"todos": [
{
"todo_id": 3,
"title": "bbbbb"
},
{
"todo_id": 4,
"title": "cccccc"
}
],
"todo_list": [
3,
4
]
},
"sort": [
1
]
}
]
}
}
what result do you expect? What is the error message actually seen?
it is hoped that each DOM content todos and todo_list stored in ES can correspond to the number of results of the database query and be consistent with it. Now only the data with id = 1 is normal. 2 and 11 are missing a
.