A POC for multilingual (Python, Javascript, Ruby, etc) UDFs in KSQL. The KSQL changes I've been working on are now conventiently packaged inside a Docker image for demonstration purposes :)
I wanted to see if I could implement multilingual UDFs in KSQL. I was able to get this working with the help of GraalVM, but the effort is still a (fairly advanced) work in progress and I'd like to coordinate with others who are either actively working on this, or who have feedback.
The actual work is happening in this branch, and you're free to build that branch locally + run your own tests without the aid of the magicalpipelines/ksql-multilingual-udfs
Docker image included here (note: you'll need to run KSQL using GraalVM for multilingual UDFs to work). The reason I created this Docker image is so that others who don't have a lot of experiencing building the KSQL project / sub-projects can test this functionality out very easily. Also, those who don't have the patience to install GraalVM can instead just use this image as described in this doc.
See KLIP-2 for more info about the motivation behind multilingual UDFs
- a running Kafka cluster (examples reference a bootstrap server at
localhost:9092
. update accordingly) - kafkacat (optional)
-
# tab 1 $ docker run --net=host \ -e BOOTSTRAP_SERVERS=localhost:9092 \ -ti magicalpipelines/ksql-multilingual-udfs:latest
-
# tab 2 $ docker run --net=host \ -ti magicalpipelines/ksql-multilingual-udfs:latest \ ksql
Once the CLI has started, set the following config:
ksql> SET 'auto.offset.reset' = 'earliest';
-
Create a dummy topic named
api_logs
. If your cluster is configured to auto-create topics, then you can skip to the next step, which will create the topic once you produce some dummy data. -
$ echo '{"endpoint": "about.html", "status_code": 200}' | kafkacat -P -b localhost:9092 -t api_logs && \ echo '{"endpoint": "index.html", "status_code": 200}' | kafkacat -P -b localhost:9092 -t api_logs && \ echo '{"endpoint": "contact.php", "status_code": 404}' | kafkacat -P -b localhost:9092 -t api_logs
-
ksql> CREATE STREAM api_logs (endpoint VARCHAR, status_code INT) WITH (kafka_topic='api_logs', value_format='JSON');
Now, in the CLI, run the following commands to try out multilingual UDFS :)
Create a Javascript UDF with the following command:
ksql> CREATE OR REPLACE FUNCTION STATUS_MAJOR(status_code INT)
RETURNS VARCHAR
LANGUAGE JAVASCRIPT AS $$
(code) => code.toString().charAt(0) + 'xx'
$$
WITH (author='Mitch Seymour', description='js udf example', version='0.1.0');
Verify the new UDF exists:
ksql> DESCRIBE FUNCTION STATUS_MAJOR ;
Name : STATUS_MAJOR
Author : 'Mitch Seymour'
Version : '0.1.0'
Type : scalar
Jar : internal
Variations :
Variation : STATUS_MAJOR(INT)
Returns : VARCHAR
Description : 'js udf example'
Invoke the new UDF:
SELECT endpoint, status_code, status_major(status_code)
FROM api_logs ;
Verify the output:
about.html | 200 | 2xx
index.html | 200 | 2xx
contact.php | 404 | 4xx
Create a Python UDF with the following command:
ksql> CREATE OR REPLACE FUNCTION ENDPOINT_TYPE(endpoint VARCHAR)
RETURNS VARCHAR
LANGUAGE PYTHON AS $$
lambda endpoint: endpoint.split(".")[1]
$$
WITH (author='Mitch Seymour', description='python udf example', version='0.1.0');
Verify the new UDF exists:
ksql> DESCRIBE FUNCTION ENDPOINT_TYPE ;
Name : ENDPOINT_TYPE
Author : 'Mitch Seymour'
Version : '0.1.0'
Type : scalar
Jar : internal
Variations :
Variation : ENDPOINT_TYPE(VARCHAR)
Returns : VARCHAR
Description : 'python udf example'
Invoke the new UDF. Note: the cold start with Python is a little more noticable than the Javascript UDF. But it should only incurred when the UDF is first instantiated.
SELECT endpoint, endpoint_type(endpoint)
FROM api_logs ;
Verify the output:
about.html | html
index.html | html
contact.php | php
Create a Ruby UDF with the following command:
ksql> CREATE OR REPLACE FUNCTION REVERSE(endpoint VARCHAR)
RETURNS VARCHAR
LANGUAGE RUBY AS $$
lambda { |x| x.reverse }
$$
WITH (author='Mitch Seymour', description='ruby udf example', version='0.1.0');
Verify the new UDF exists:
ksql> DESCRIBE FUNCTION REVERSE ;
Name : REVERSE
Author : 'Mitch Seymour'
Version : '0.1.0'
Type : scalar
Jar : internal
Variations :
Variation : REVERSE(VARCHAR)
Returns : VARCHAR
Description : 'ruby udf example'
Invoke the new UDF:
SELECT endpoint, reverse(endpoint)
FROM api_logs ;
Verify the output:
about.html | lmth.tuoba
index.html | lmth.xedni
contact.php | php.tcatnoc
FROM FILE
implementation?KLIP?- Test headless mode!
- Unit tests in the branch itself
- Bug hunting / fixing