从 Kafka 写入
Kafka 介绍
Apache Kafka 是开源的分布式消息分发平台,被广泛应用于高性能数据管道、流式数据分析、数据集成和事件驱动类型的应用程序。Kafka 包含 Producer、Consumer 和 Topic,其中 Producer 是向 Kafka 发送消息的进程,Consumer 是从 Kafka 消费消息的进程。Kafka 相关概念可以参考官方文档。
kafka topic
Kafka 的消息按 topic 组织,每个 topic 会有一到多个 partition。可以通过 kafka 的 kafka-topics
管理 topic。
创建名为 kafka-events
的topic:
bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092
修改 kafka-events
的 partition 数量为 3:
bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092
展示所有的 topic 和 partition:
bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe
写入 TDengine
TDengine 支持 Sql 方式和 Schemaless 方式的数据写入,Sql 方式数据写入可以参考 TDengine SQL 写入 和 TDengine 高效写入。Schemaless 方式数据写入可以参考 TDengine Schemaless 写入 文档。
示例代码
- Python
python Kafka 客户端
Kafka 的 python 客户端可以参考文档 kafka client。推荐使用 confluent-kafka-python 和 kafka-python。以下示例以 kafka-python 为例。
从 Kafka 消费数据
Kafka 客户端采用 pull 的方式从 Kafka 消费数据,可以采用单条消费的方式或批量消费的方式读取数据。使用 kafka-python 客户端单条消费数据的示例如下:
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
print (msg)
单条消费的方式在数据流量大的情况下往往存在性能瓶颈,导致 Kafka 消息积压,更推荐使用批量消费的方式消费数据。使用 kafka-python 客户端批量消费数据的示例如下:
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
while True:
msgs = consumer.poll(timeout_ms=500, max_records=1000)
if msgs:
print (msgs)
Python 多线程
为了提高数据写入效率,通常采用多线程的方式写入数据,可以使用 python 线程池 ThreadPoolExecutor 实现多线程。示例代码如下:
from concurrent.futures import ThreadPoolExecutor, Future
pool = ThreadPoolExecutor(max_workers=10)
pool.submit(...)