Create an input stream that pulls message from a Aliyun ONS stream.
Create an input stream that pulls message from a Aliyun ONS stream.
JavaStreamingContext jssc = ... String cid = "ConsumerID" String topic = "sample-topic" String subExpression = "*" String accessKeyId = "kj7aY*******UYx6" String accessKeySecret = "AiNMAlxz*************1PxaPaL8t" String accessKeySecret = "" static class ReadMessage implements Function<Message, Byte[]> { @Override public Byte[] call(Message msg) { return msg.getBody; } JavaReceiverInputDStream<Byte[]> onsStream = OnsUtils.createStream( ssc, cId, topic, subExpression, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK_2, ReadMessage) onsStream.foreachRDD(rdd => { ... })
Java streamingContext object
Name of a set of consumers
Which topic to subscribe
Which tag to subscribe
Aliyun Access Key ID
Aliyun Access Key Secret
Storage level to use for storing the received objects. StorageLevel.MEMORY_AND_DISK_2 is recommended.
Extract information from ONS message
Create an input stream that pulls message from a Aliyun ONS stream.
Create an input stream that pulls message from a Aliyun ONS stream.
val ssc: StreamingSparkContext = ... val cid = "ConsumerID" val topic = "sample-topic" val subExpression = "*" val accessKeyId = "kj7aY*******UYx6" val accessKeySecret = "AiNMAlxz*************1PxaPaL8t" val accessKeySecret = "" def func: Message => Array[Byte] = msg => msg.getBody val onsStream = OnsUtils.createStream( ssc, cId, topic, subExpression, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK_2, func) onsStream.foreachRDD(rdd => { ... })
StreamingContext object
Name of a set of consumers
Which topic to subscribe
Which tag to subscribe
Aliyun Access Key ID
Aliyun Access Key Secret
Storage level to use for storing the received objects. StorageLevel.MEMORY_AND_DISK_2 is recommended.
Extract information from ONS message
Create an union input stream that pulls message from a Aliyun ONS stream.
Create an union input stream that pulls message from a Aliyun ONS stream.
val ssc: StreamingSparkContext = ... val consumerIdTopicTags = Array(("ConsumerID1", "sample-topic1", "*")) val accessKeyId = "kj7aY*******UYx6" val accessKeySecret = "AiNMAlxz*************1PxaPaL8t" val accessKeySecret = "" def func: Message => Array[Byte] = msg => msg.getBody val onsStream = OnsUtils.createStream( ssc, accessKeyId, accessKeySecret, StorageLevel.MEMORY_AND_DISK_2, func) onsStream.foreachRDD(rdd => { ... })
StreamingContext object
Trituple(consumerId, topic, tag)
Aliyun Access Key ID
Aliyun Access Key Secret
Storage level to use for storing the received objects. StorageLevel.MEMORY_AND_DISK_2 is recommended.
Extract information from ONS message