在CentOS系统上整合Hadoop分布式文件系统(HDFS)与Apache Kafka,通常会把Kafka当作数据的生成器或接收器,并且将数据存储到HDFS或者从HDFS获取数据。下面是一个简化版的应用场景,演示了怎样
利用Kafka把数据存储到HDFS。
kafka-topics.sh --create --topic order-created-topic --partitions 12 --replication-factor 3 --bootstrap-server localhost:9092
Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); Producerproducer = new KafkaProducer(props); producer.send(new ProducerRecord("order-created-topic", orderId, orderJson)); producer.close();
SparkConf conf = new SparkConf().setAppName("Kafka to HDFS");
conf.setMaster("local[*]");
JavaPairRDD lines = KafkaUtils.createDirectStream(
conf,
"order-created-topic",
new StringDeserializer(),
new StringDeserializer()
).mapToPair(record -> new Tuple2(record.value(), record.key()));
lines.saveAsHadoopFile("/path/to/hdfs/directory",
new TextOutputFormat(),
"org.apache.hadoop.mapred.lib.MultipleTextOutputFormat",
new Configuration(false)
);
请记住,上述代码样本和配置或许得依据实际环境做出改动。在真实应用里,还需要顾及到数据的序列化方法、错误处理、资源配置等细节。另外,对于生产环境,还需考量安全配置,例如SSL/TLS加密以及认证。