I use eggjs to try to connect to kafka, so that topic corresponds to the file name under app/kafka, and finally executes all the methods under the file name. If I want to deal with the business in a separate directory under the app directory, at first I imagine that I can"t find the corresponding controller method when the onmessage is triggered by loadController,kafka, and finally solve it in a rough way. I don"t know if there"s any problem, if there"s anything I can give you. The code is as follows:
load kafka:
//kafka-load
import * as path from "path"
export default app => {
let dirs = app.loader
.getLoadUnits()
.map(unit => path.join(unit.path, "app", "kafka"))
app.kafka = app.kafka || {}
new app.loader.FileLoader({
directory: dirs,
target: app.kafka,
initializer: (kafka, opts) => {
const fileName = path.basename(opts.path, path.extname(opts.path))
Object.keys(kafka).map(action => {
if (!app.kafka[fileName]) {
app.kafka[fileName] = {}
}
app.kafka[fileName][action] = kafka[action]
})
return null
}
}).load()
}
kafka connect
//kafka connect
import * as Kafka from "kafka-node"
import { KafkaConfig } from "../config/config.d"
import load from "./kafkaLoad"
export default app => {
load(app)
const config: KafkaConfig = app.config.kafka
const zookeepers = config.host.join(",")
const client = new Kafka.Client(zookeepers, config.clientId)
const consumer = new Kafka.Consumer(client, config.topics, config.options)
const topics = config.topics.map(item => item.topic)
consumer.on("message", message => {
const topicConsumers = app.kafka[message.topic]
if (topicConsumers) {
Object.keys(topicConsumers).map(name =>
topicConsumers[name].call(app, message.value)
)
}
app.logger.info(
`[egg-kafka] Receive producer message`,
JSON.stringify(message)
)
})
consumer.on("error", error => {
app.coreLogger.error(`[egg-kafka] init instance error`, error)
})
app.beforeStart(() => {
app.coreLogger.info(
`[egg-kafka] init instance success ,host@${zookeepers} -----> topic@${topics}`
)
})
}
kafka controller
export interface TopicNodejsMethods {
test1(message: { [key: string]: any }): Promise<any>
test2(message: { [key: string]: any }): Promise<any>
}
export type TopicNodejs = Application & TopicNodejsMethods
export default {
async test1(message) {
this.io.of("/").emit("passAlarm",message)
},
async test2(message) {
console.log(message)
}
} as TopicNodejs