Thursday, September 29, 2016

Setting up PHP-Kafka using librdkafka wrapper phpkafka from EVODelavega under Ubuntu.


We will install kafka for PHP based on the libraries:

https://github.com/EVODelavega/phpkafka
https://github.com/edenhill/librdkafka

Setting up
==========
Fast way (Only Ubuntu)
--------

$ sudo apt-get install librdkafka1

Dependencies
------------
First let install some dependencies (installing manually)

$ sudo apt-get install libsasl2-dev liblz4-dev

Installing php-rdkafka (Other OS)
----------------------

$ git clone https://github.com/edenhill/librdkafka/
$ cd librdkafka
$ ./configure
$ make
$ make test
$ sudo make install

Installing phpkafka Extension
-----------------------------

$ git clone https://github.com/EVODelavega/phpkafka.git
$ cd phpkafka
$ phpize
$ ./configure --enable-kafka
$ sudo make install

If you want to check that kafka is in place:

$ php -m | grep kafka
kafka


Final PHP configuration
-----------------------
1) Create a PHP file containing only the call to phpinfo()

2) Run the script from your browser and check the path of the config ini files that your system is using. For example, my additional INI files are in /etc/php5/fpm/conf.d.

3) Creates a new file librd.con in that folder. Example:

$ sudo touch /etc/php5/fpm/conf.d/librd.conf

4) Add to the file the path of your libraries: /usr/local/lib. You can also do (as superuser):

$ echo "/usr/local/lib" >> /etc/ld.so.conf.d/librd.conf

6) Create the file 20-kafka.ini in your additional INI directory:

$ sudo vi /etc/php5/fpm/conf.d/20-kafka.ini

and insert the line:

extension=kafka.so

7) Run the command

$ sudo ldconfig

8) Check that everthing is file and the extension was loaded

$ ldconfig -p | grep kafka

9) Reset services

sudo /etc/init.d/php5-fpm restart


Setting the kafka with Docker for testing
=========================================

1) Download and install the containers (Check https://hub.docker.com/r/confluent/platform/)

$ docker-machine start
$ eval $(docker-machine env)
$ docker pull confluent/platform

2) Run the hub

# Start Zookeeper and expose port 2181 for use by the host machine
docker run -d --name zookeeper -p 2181:2181 confluent/zookeeper

# Start Kafka and expose port 9092 for use by the host machine
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper confluent/kafka

# Start Schema Registry and expose port 8081 for use by the host machine
docker run -d --name schema-registry -p 8081:8081 --link zookeeper:zookeeper \
--link kafka:kafka confluent/schema-registry

# Start REST Proxy and expose port 8082 for use by the host machine
docker run -d --name rest-proxy -p 8082:8082 --link zookeeper:zookeeper \
--link kafka:kafka --link schema-registry:schema-registry confluent/rest-proxy

3) Everything now is ready to continue


Working with kafka
==================

Create the files producer.php and consumer.php as follows in this section. For testing, open 2 terminals and run first the consumer.php script. On the other terminal run the producer.php script many times as you want. You should see how the message from the producer is picked by the consumer.

PHP Producer (producer.php)
------------
<?php

$avro_schema = [
"namespace" => "yournamespace",
"type" => "record",
"name" => "machineLog",
"doc" => "That is the documentation",
"fields" => [
["name" => "host", "type" => "string"],
["name" => "log", "type" => "string"],
]
];

$records = [
[ "host" => "hostname1", "host" => "cpu 67590 0 28263 13941723 602 7 1161 0 0 0" ],
[ "host" => "hostname2", "host" => "cpu0 67591 0 28266 13944700 602 7 1161 0 0 0" ]
];

$msg = json_encode([
"value_schema" => $avro_schema,
"records" => $records
]);

//var_dump(json_encode($msg));
$kafka = new Kafka("192.168.99.100:9092");
try {
$kafka->produce("jsontest", $msg);
} catch (Exception $e) {
echo $e->getMessage() . PHP_EOL;
}

$kafka->disconnect(Kafka::MODE_PRODUCER);



PHP Consumer (consumer.php)
------------
<?php

$kafka = new Kafka("192.168.99.100:9092");
$partitions = $kafka->getPartitionsForTopic('jsontest');
$kafka->setPartition($partitions[0]);
$offset = 1;
$size = 1;

while (1) {
try {
$messages = $kafka->consume("jsontest", $offset, $size);
if (count($messages) > 0) {
foreach ($messages as $message) {
echo $message . PHP_EOL;
$offset += 1;
}
}
} catch (Exception $e) {
echo $e->getMessage() . PHP_EOL;
break;
}
}

$kafka->disconnect();


That's all! Enjoy Kafka!

No comments:

Post a Comment