# Kafka

## Reference links

## Configuration

### Certifcate Generation

```makefile
KAFKA_ALIAS := localhost
KAFKA_CERTS_PATH := ./.data/kafka/config/certs
KAFKA_CA_KEY_PATH := ${KAFKA_CERTS_PATH}/ca-key
KAFKA_CA_CERT_PATH := ${KAFKA_CERTS_PATH}/ca-cert

KAFKA_CLIENT_CERT_PATH := ${KAFKA_CERTS_PATH}/client-cert
KAFKA_CLIENT_KEY_PATH := ${KAFKA_CERTS_PATH}/client-key
KAFKA_CLIENT_P12_PATH := ${KAFKA_CERTS_PATH}/client.p12

KAFKA_JKS_KEYSTORE_PATH := ${KAFKA_CERTS_PATH}/kafka.keystore.jks
KAFKA_JKS_TRUSTSTORE_PATH := ${KAFKA_CERTS_PATH}/kafka.truststore.jks

kafka-jks: # ref https://www.ibm.com/docs/en/cloud-paks/cp-biz-automation/20.0.x?topic=emitter-preparing-ssl-certificates-kafka
	rm -rf ${KAFKA_CERTS_PATH}/*
	mkdir -p ${KAFKA_CERTS_PATH}
	echo '*' > ${KAFKA_CERTS_PATH}/.gitignore
	echo '!.gitignore' >> ${KAFKA_CERTS_PATH}/.gitignore

	# create certificate authority
	openssl req -new -x509 -keyout ${KAFKA_CA_KEY_PATH} -out ${KAFKA_CA_CERT_PATH} -days 365

	# create client certificate
	openssl req -new -newkey rsa:2048 -nodes -keyout ${KAFKA_CLIENT_KEY_PATH} -out ${KAFKA_CLIENT_CERT_PATH} -days 365
	openssl x509 -req -days 365 -in ${KAFKA_CLIENT_CERT_PATH} -CA ${KAFKA_CA_CERT_PATH} -CAkey ${KAFKA_CA_KEY_PATH} -out ${KAFKA_CLIENT_CERT_PATH} -set_serial 01 -sha256

	# package client data into client keystore
	openssl pkcs12 -export -in ${KAFKA_CLIENT_CERT_PATH} -inkey ${KAFKA_CLIENT_KEY_PATH} -name user > ${KAFKA_CLIENT_P12_PATH}
	keytool -importkeystore -srckeystore ${KAFKA_CLIENT_P12_PATH} -destkeystore ${KAFKA_JKS_KEYSTORE_PATH} -srcstoretype pkcs12 -alias user

	# package certificate authority into server truststore
	keytool -keystore ${KAFKA_JKS_TRUSTSTORE_PATH} -alias CARoot -import -file ${KAFKA_CA_CERT_PATH}

	chmod 644 ${KAFKA_CERTS_PATH}/*
```

## Docker

### Image

{% embed url="<https://hub.docker.com/r/bitnami/kafka/>" %}

### Compose

```yaml
version: "3.7"
services:
  kafka: #
    # image reference: https://hub.docker.com/r/bitnami/kafka/
    image: bitnami/kafka:3.5.1
    environment:
    - KAFKA_CFG_NODE_ID=0
    - KAFKA_CFG_PROCESS_ROLES=controller,broker
    - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093
    - KAFKA_CFG_LISTENERS=SASL_SSL://:9092,CONTROLLER://:9093
    - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    - KAFKA_CFG_ADVERTISED_LISTENERS=SASL_SSL://:9092
    - KAFKA_CLIENT_USERS=user
    - KAFKA_CLIENT_PASSWORDS=password
    - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
    - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
    - KAFKA_CONTROLLER_USER=controller_user
    - KAFKA_CONTROLLER_PASSWORD=controller_password
    - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=SASL_SSL
    - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
    - KAFKA_INTER_BROKER_USER=controller_user
    - KAFKA_INTER_BROKER_PASSWORD=controller_password
    - KAFKA_CERTIFICATE_PASSWORD=password
    - KAFKA_TLS_TYPE=JKS
    ports:
      - '9092:9092'
      - '9093:9093'
    network_mode: host
    volumes: # [] # uncomment this and comment below to remove persistence
      - ./.data/kafka/data:/bitnami/kafka/data
      - ./.data/kafka/config:/bitnami/kafka/config
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://notes.joeir.net/application-infrastructure/message-brokers/kafka.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
