Commit 082881a4 authored by chinguyen's avatar chinguyen

Merge branch 'kafka' into 'develop'

add apache Kafka

See merge request !3
parents 3b3024b9 7e97eab1
...@@ -37,6 +37,7 @@ ...@@ -37,6 +37,7 @@
"cross-env": "7.0.0", "cross-env": "7.0.0",
"express": "4.17.1", "express": "4.17.1",
"express-boom": "3.0.0", "express-boom": "3.0.0",
"kafka-node": "^5.0.0",
"morgan": "1.9.1", "morgan": "1.9.1",
"pg": "^8.5.1", "pg": "^8.5.1",
"pg-hstore": "^2.3.3", "pg-hstore": "^2.3.3",
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
"development": { "development": {
"username": "postgres", "username": "postgres",
"password": "meu@sds12@!#gh", "password": "meu@sds12@!#gh",
"database": "node_sequelize", "database": "chinguyen_nodejs-practice",
"host": "27.74.255.96", "host": "27.74.255.96",
"port": 5430, "port": 5430,
"dialect": "postgres" "dialect": "postgres"
......
import * as LanguagesDao from './languages' import * as LanguagesDao from './languages'
import * as AppUserDao from './appusers' import * as AppUserDao from './appusers'
import * as StudentDao from './students' import * as StudentDao from './students'
import * as ConsumerDao from '../kafka/consumerMessage'
import * as ProducerDao from '../kafka/messageProducer'
export { LanguagesDao } export { LanguagesDao }
export { AppUserDao } export { AppUserDao }
export { StudentDao } export { StudentDao }
export { ConsumerDao }
export { ProducerDao }
import * as LanguageController from './languages/_index' import * as LanguageController from './languages/_index'
import * as AppUserController from './appusers/_index' import * as AppUserController from './appusers/_index'
import * as StudentsController from './students/_index' import * as StudentsController from './students/_index'
import * as ConsumerController from './consumer/_index'
import * as ProducerController from './producer/_index'
export { LanguageController, AppUserController, StudentsController } export { LanguageController, AppUserController, StudentsController, ConsumerController, ProducerController }
import * as ConsumerGet from './consumerMessage.get'
export { ConsumerGet }
import { Request, Response } from 'express'
import { ConsumerDao } from '../../dao/_index'
export function getMessage(req: Request, res: Response){
return ConsumerDao.receiveMessage();
}
\ No newline at end of file
import * as ProducerPost from './producerMessage.post'
export { ProducerPost }
\ No newline at end of file
import { ProducerDao } from '../../dao/_index'
import { Request, Response } from 'express'
export function producer(req: Request, res: Response){
return ProducerDao.produceMessage();
}
\ No newline at end of file
import * as kafka from 'kafka-node';
//import {getConsumer} from './message'
export function receiveMessage(): any{
const _that = this;
let topicName = 'message-chinguyen';
let produceTopic = topicName;
let kafkaConsumer = kafka.Consumer;
let Client = kafka.KafkaClient;
let client = new Client({kafkaHost: 'localhost:9092'})
let consumer = new kafkaConsumer(client,[{topic: produceTopic}],{autoCommit: false, fetchMaxWaitMs: 1000, fetchMaxBytes: 1024 * 1024})
consumer.on('message', function(message){
console.log(message);
});
consumer.on('error', function (err) {
console.log('error', err);
});
}
\ No newline at end of file
import * as kafka from 'kafka-node';
//import {} from 'optimist'
let clientOpts = {
kafkaHost: 'localhost:9092',
//sasl: configuration.kafkaClient.sasl,
connectTimeout: 10000,
maxAsyncRequests: 10,
requestTimeout: 30000
};
// Create Producer
let MessageProducer = kafka.Producer,
aaClient = new kafka.KafkaClient(clientOpts),
messageProducer = new MessageProducer(aaClient,{requireAcks: 1})
messageProducer.on('ready', function () {
setMessageProducer(messageProducer);
});
export function getMessageProducer(): kafka.Producer {
return messageProducer;
}
export function setMessageProducer(producerObj: kafka.Producer): void {
messageProducer = producerObj;
}
///
import { Producer } from 'kafka-node'
import {getMessageProducer} from './message'
export function produceMessage() {
const _that = this;
let producer = getMessageProducer();
let topicName = 'message-chinguyen';
let produceTopic = topicName;
let payloads: {
topic: any;
messages: any;
key: any;
attributes: any;
timestamp: any;
}[] = [];
let curTimestamp = Date.now();
payloads.push({topic: produceTopic, messages: 'hahahahaha', key: 1, attributes: 'haha', timestamp: curTimestamp});
if (payloads.length > 0) {
producer.send(payloads, function (err, data) {
if (err) {
console.log("error");
}
else {
console.log("success");
}
});
}
}
\ No newline at end of file
import { Express } from 'express'
import { ConsumerController, ProducerController } from '../endpoints/_index'
export function routes(app: Express){
app.post('/api/producerMessage', ProducerController.ProducerPost.producer)
app.get('/api/consumerMessage', ConsumerController.ConsumerGet.getMessage)
}
{ {
"development": { "development": {
"username": "postgres", "username": "postgres",
"password": "chinguyen@123", "password": "meu@sds12@!#gh",
"database": "node_sequelize", "database": "chinguyen_nodejs-practice",
"host": "127.0.0.1", "host": "27.74.255.96",
"port": 5432, "port": 5432,
"dialect": "postgres" "dialect": "postgres"
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment