自主producer、consumer
-
首先在不同的终端,分别开启两个consumer,保证groupid一致
]# python consumer_kafka.py
-
执行一次producer
]# python producer_kafka.py
-
指定key的partition进行发送信息:
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# # block until all pending messages are sent
# for _ in range(10):
# producer.send('test_m_brokers', b'are you ok!!!')
#
# producer.flush()
# key for hashed partitioning
producer.send('zhongqiu_many_brokers', key=b'', value=b'aaa')
producer.flush() -
指定partition和offset读数据
#encoding=utf8
from kafka import KafkaConsumer
from kafka import TopicPartition
from kafka.structs import OffsetAndMetadata
from kafka.structs import TopicPartitiondef main():
consumer = KafkaConsumer('zhongqiu_many_brokers', bootstrap_servers=['master:9092'])
print consumer.partitions_for_topic("zhongqiu_many_brokers")
print consumer.topics() #获取主题列表
print consumer.subscription() #获取当前消费者订阅的主题
print consumer.assignment() #获取当前消费者topic、分区信息
print consumer.beginning_offsets(consumer.assignment()) #获取当前消费者可消费的偏移量 consumer.seek(TopicPartition(topic=u'zhongqiu_many_brokers', partition=0), 10) #重置偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))if __name__ == "__main__":
main()