org.apache.spark.streaming.aliyun.logservice

LoghubUtils

object LoghubUtils

Various utility classes for working with Aliyun LogService.

Prepare Work:

Before receiving log data, we need to create two mysql tables used by loghub:

Create Table if not exists loghub_lease
(
  `auto_id` int(10) not null auto_increment,
  `consume_group` varchar(64),
  `logstream_sig` varchar(64),
  `shard_id` varchar(64),
  `lease_id` int(20),
  `lease_owner` varchar(64),
  `consumer_owner` varchar(64),
  `update_time` datetime,
  `checkpoint` text,
  PRIMARY KEY(`auto_id`),
  UNIQUE KEY(`consume_group`,`logstream_sig`, `shard_id`)
)ENGINE = InnoDB DEFAULT CHARSET=utf8;

Create Table if not exists loghub_worker
(
  `auto_id` int(10) not null auto_increment,
  `consume_group` varchar(64),
  `logstream_sig` varchar(64),
  `instance_name` varchar(64),
  `update_time` datetime,
  PRIMARY KEY(`auto_id`),
  UNIQUE KEY(`consume_group`, `logstream_sig`, `instance_name`)
  )ENGINE = InnoDB DEFAULT CHARSET=utf8;
Linear Supertypes
AnyRef, Any
Ordering
  1. Alphabetic
  2. By inheritance
Inherited
  1. LoghubUtils
  2. AnyRef
  3. Any
  1. Hide All
  2. Show all
Learn more about member selection
Visibility
  1. Public
  2. All

Value Members

  1. final def !=(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  2. final def !=(arg0: Any): Boolean

    Definition Classes
    Any
  3. final def ##(): Int

    Definition Classes
    AnyRef → Any
  4. final def ==(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  5. final def ==(arg0: Any): Boolean

    Definition Classes
    Any
  6. final def asInstanceOf[T0]: T0

    Definition Classes
    Any
  7. def clone(): AnyRef

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  8. def createStream(ssc: StreamingContext, mysqlHost: String, mysqlPort: Int, mysqlDatabase: String, mysqlUser: String, mysqlPwd: String, logServiceProject: String, logStoreName: String, loghubConsumerGroupName: String, loghubEndpoint: String, numReceivers: Int, accessKeyId: String, accessKeySecret: String, storageLevel: StorageLevel): DStream[Array[Byte]]

    Create loghub DStream.

    Create loghub DStream.

    val dbHost = "localhost"
    val dbPort = "3306"
    val db = "sample-db"
    val dbUser = "tom"
    var pwd = "123456"
    val loghubProject = "sample-project"
    val logStream = "sample-logstore"
    val loghubGroupName = "sample-group"
    val endpoint = "cn-hangzhou-intranet.sls.aliyuncs.com"
    val accessKeyId = "kj7aY*******UYx6"
    val accessKeySecret = "AiNMAlxz*************1PxaPaL8t"
    val numReceivers = 2
    val batchInterval = Milliseconds(5 * 1000)
    
    val conf = new SparkConf().setAppName("Test Loghub")
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
      ssc,
      dbHost,
      dbPort,
      db,
      dbUser,
      pwd,
      loghubProject,
      logStream,
      loghubGroupName,
      endpoint,
      numReceivers
      accessKeyId,
      accessKeySecret,
      StorageLevel.MEMORY_AND_DISK)
    ssc

    StreamingContext.

    mysqlHost

    The host of Mysql needed by loghub.

    mysqlPort

    The port of Mysql needed by loghub.

    mysqlDatabase

    The name of mysql database.

    mysqlUser

    The username of mysql.

    mysqlPwd

    The password of mysql.

    logServiceProject

    The name of LogService project

    logStoreName

    The name of logStore.

    loghubConsumerGroupName

    The group name of loghub consumer. All consumer process which has the same group name will consumer specific logStore together.

    loghubEndpoint

    The endpoint of loghub.

    numReceivers

    The number of receivers.

    accessKeyId

    The Aliyun AccessKeyId.

    accessKeySecret

    The Aliyun AccessKeySecret.

    storageLevel

    The storage level.

    returns

    Annotations
    @Experimental()
  9. def createStream(ssc: StreamingContext, mysqlHost: String, mysqlPort: Int, mysqlDatabase: String, mysqlUser: String, mysqlPwd: String, mysqlWorkerInstanceTableName: String, mysqlShardLeaseTableName: String, logServiceProject: String, logStoreName: String, loghubConsumerGroupName: String, loghubEndpoint: String, numReceivers: Int, accessKeyId: String, accessKeySecret: String, storageLevel: StorageLevel): DStream[Array[Byte]]

    Create loghub DStream.

    Create loghub DStream.

    val dbHost = "localhost"
    val dbPort = "3306"
    val db = "sample-db"
    val dbUser = "tom"
    var pwd = "123456"
    val instanceWorker = "loghub_worker"
    val lease = "loghub_lease"
    val loghubProject = "sample-project"
    val logStream = "sample-logstore"
    val loghubGroupName = "sample-group"
    val endpoint = "cn-hangzhou-intranet.sls.aliyuncs.com"
    val accessKeyId = "kj7aY*******UYx6"
    val accessKeySecret = "AiNMAlxz*************1PxaPaL8t"
    val numReceivers = 2
    val batchInterval = Milliseconds(5 * 1000)
    
    val conf = new SparkConf().setAppName("Test Loghub")
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
      ssc,
      dbHost,
      dbPort,
      db,
      dbUser,
      pwd,
      instanceWorker,
      lease,
      loghubProject,
      logStream,
      loghubGroupName,
      endpoint,
      numReceivers
      accessKeyId,
      accessKeySecret,
      StorageLevel.MEMORY_AND_DISK)
    ssc

    StreamingContext.

    mysqlHost

    The host of Mysql needed by loghub.

    mysqlPort

    The port of Mysql needed by loghub.

    mysqlDatabase

    The name of mysql database.

    mysqlUser

    The username of mysql.

    mysqlPwd

    The password of mysql.

    mysqlWorkerInstanceTableName

    The name of WorkInstance table, such as "loghub_worker" above.

    mysqlShardLeaseTableName

    The name of ShardLease table, such as "loghub_lease" above.

    logServiceProject

    The name of LogService project

    logStoreName

    The name of logStore.

    loghubConsumerGroupName

    The group name of loghub consumer. All consumer process which has the same group name will consumer specific logStore together.

    loghubEndpoint

    The endpoint of loghub.

    numReceivers

    The number of receivers.

    accessKeyId

    The Aliyun AccessKeyId.

    accessKeySecret

    The Aliyun AccessKeySecret.

    storageLevel

    The storage level.

    returns

    Annotations
    @Experimental()
  10. def createStream(ssc: StreamingContext, mysqlHost: String, mysqlPort: Int, mysqlDatabase: String, mysqlUser: String, mysqlPwd: String, logServiceProject: String, logStoreName: String, loghubConsumerGroupName: String, loghubEndpoint: String, accessKeyId: String, accessKeySecret: String, storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]]

    Create loghub DStream.

    Create loghub DStream.

    val dbHost = "localhost"
    val dbPort = "3306"
    val db = "sample-db"
    val dbUser = "tom"
    var pwd = "123456"
    val instanceWorker = "loghub_worker"
    val lease = "loghub_lease"
    val loghubProject = "sample-project"
    val logStream = "sample-logstore"
    val endpoint = "cn-hangzhou-intranet.sls.aliyuncs.com"
    val accessKeyId = "kj7aY*******UYx6"
    val accessKeySecret = "AiNMAlxz*************1PxaPaL8t"
    val numReceivers = 2
    val batchInterval = Milliseconds(5 * 1000)
    
    val conf = new SparkConf().setAppName("Test Loghub")
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
      ssc,
      dbHost,
      dbPort,
      db,
      dbUser,
      pwd,
      loghubProject,
      logStream,
      loghubGroupName,
      endpoint,
      accessKeyId,
      accessKeySecret,
      StorageLevel.MEMORY_AND_DISK)
    ssc

    StreamingContext.

    mysqlHost

    The host of Mysql needed by loghub.

    mysqlPort

    The port of Mysql needed by loghub.

    mysqlDatabase

    The name of mysql database.

    mysqlUser

    The username of mysql.

    mysqlPwd

    The password of mysql.

    logServiceProject

    The name of LogService project

    logStoreName

    The name of logStore.

    loghubConsumerGroupName

    The group name of loghub consumer. All consumer process which has the same group name will consumer specific logStore together.

    loghubEndpoint

    The endpoint of loghub.

    accessKeyId

    The Aliyun AccessKeyId.

    accessKeySecret

    The Aliyun AccessKeySecret.

    storageLevel

    The storage level.

    returns

    Annotations
    @Experimental()
  11. def createStream(ssc: StreamingContext, mysqlHost: String, mysqlPort: Int, mysqlDatabase: String, mysqlUser: String, mysqlPwd: String, mysqlWorkerInstanceTableName: String, mysqlShardLeaseTableName: String, logServiceProject: String, logStoreName: String, loghubConsumerGroupName: String, loghubEndpoint: String, accessKeyId: String, accessKeySecret: String, storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]]

    val dbHost = "localhost"
    val dbPort = "3306"
    val db = "sample-db"
    val dbUser = "tom"
    var pwd = "123456"
    val instanceWorker = "loghub_worker"
    val lease = "loghub_lease"
    val loghubProject = "sample-project"
    val logStream = "sample-logstore"
    val loghubGroupName = "sample-group"
    val endpoint = "cn-hangzhou-intranet.sls.aliyuncs.com"
    val accessKeyId = "kj7aY*******UYx6"
    val accessKeySecret = "AiNMAlxz*************1PxaPaL8t"
    val batchInterval = Milliseconds(5 * 1000)
    
    val conf = new SparkConf().setAppName("Test Loghub")
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
      ssc,
      dbHost,
      dbPort,
      db,
      dbUser,
      pwd,
      instanceWorker,
      lease,
      loghubProject,
      logStream,
      loghubGroupName,
      endpoint,
      accessKeyId,
      accessKeySecret,
      StorageLevel.MEMORY_AND_DISK)
    ssc

    StreamingContext.

    mysqlHost

    The host of Mysql needed by loghub.

    mysqlPort

    The port of Mysql needed by loghub.

    mysqlDatabase

    The name of mysql database.

    mysqlUser

    The username of mysql.

    mysqlPwd

    The password of mysql.

    mysqlWorkerInstanceTableName

    The name of WorkInstance table, such as "loghub_worker" above.

    mysqlShardLeaseTableName

    The name of ShardLease table, such as "loghub_lease" above.

    logServiceProject

    The name of LogService project

    logStoreName

    The name of logStore.

    loghubConsumerGroupName

    The group name of loghub consumer. All consumer process which has the same group name will consumer specific logStore together.

    loghubEndpoint

    The endpoint of loghub.

    accessKeyId

    The Aliyun AccessKeyId.

    accessKeySecret

    The Aliyun AccessKeySecret.

    storageLevel

    The storage level.

    returns

    Annotations
    @Experimental()
  12. final def eq(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  13. def equals(arg0: Any): Boolean

    Definition Classes
    AnyRef → Any
  14. def finalize(): Unit

    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  15. final def getClass(): Class[_]

    Definition Classes
    AnyRef → Any
  16. def hashCode(): Int

    Definition Classes
    AnyRef → Any
  17. final def isInstanceOf[T0]: Boolean

    Definition Classes
    Any
  18. final def ne(arg0: AnyRef): Boolean

    Definition Classes
    AnyRef
  19. final def notify(): Unit

    Definition Classes
    AnyRef
  20. final def notifyAll(): Unit

    Definition Classes
    AnyRef
  21. final def synchronized[T0](arg0: ⇒ T0): T0

    Definition Classes
    AnyRef
  22. def toString(): String

    Definition Classes
    AnyRef → Any
  23. final def wait(): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  24. final def wait(arg0: Long, arg1: Int): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  25. final def wait(arg0: Long): Unit

    Definition Classes
    AnyRef
    Annotations
    @throws( ... )

Deprecated Value Members

  1. def createStream(ssc: StreamingContext, mysqlHost: String, mysqlPort: Int, mysqlDatabase: String, mysqlUser: String, mysqlPwd: String, mysqlWorkerInstanceTableName: String, mysqlShardLeaseTableName: String, logServiceProject: String, logStoreName: String, loghubConsumerGroupName: String, loghubInstanceNameBase: String, loghubEndpoint: String, numReceivers: Int, accessKeyId: String, accessKeySecret: String, storageLevel: StorageLevel): DStream[Array[Byte]]

    Create loghub DStream.

    Create loghub DStream.

    val dbHost = "localhost"
    val dbPort = "3306"
    val db = "sample-db"
    val dbUser = "tom"
    var pwd = "123456"
    val instanceWorker = "loghub_worker"
    val lease = "loghub_lease"
    val loghubProject = "sample-project"
    val logStream = "sample-logstore"
    val loghubGroupName = "sample-group"
    val instanceNameBase = "sample-namebase"
    val endpoint = "cn-hangzhou-intranet.sls.aliyuncs.com"
    val accessKeyId = "kj7aY*******UYx6"
    val accessKeySecret = "AiNMAlxz*************1PxaPaL8t"
    val numReceivers = 2
    val batchInterval = Milliseconds(5 * 1000)
    
    val conf = new SparkConf().setAppName("Test Loghub")
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
      ssc,
      dbHost,
      dbPort,
      db,
      dbUser,
      pwd,
      instanceWorker,
      lease,
      loghubProject,
      logStream,
      loghubGroupName,
      instanceNameBase,
      endpoint,
      numReceivers
      accessKeyId,
      accessKeySecret,
      StorageLevel.MEMORY_AND_DISK)
    ssc

    StreamingContext.

    mysqlHost

    The host of Mysql needed by loghub.

    mysqlPort

    The port of Mysql needed by loghub.

    mysqlDatabase

    The name of mysql database.

    mysqlUser

    The username of mysql.

    mysqlPwd

    The password of mysql.

    mysqlWorkerInstanceTableName

    The name of WorkInstance table, such as "loghub_worker" above.

    mysqlShardLeaseTableName

    The name of ShardLease table, such as "loghub_lease" above.

    logServiceProject

    The name of LogService project

    logStoreName

    The name of logStore.

    loghubConsumerGroupName

    The group name of loghub consumer. All consumer process which has the same group name will consumer specific logStore together.

    loghubInstanceNameBase

    The name base of each loghub instance.

    loghubEndpoint

    The endpoint of loghub

    numReceivers

    The number of receivers.

    accessKeyId

    The Aliyun AccessKeyId.

    accessKeySecret

    The Aliyun AccessKeySecret.

    storageLevel

    The storage level.

    returns

    Annotations
    @Experimental() @deprecated
    Deprecated

    (Since version 1.0.5) No need to provide "loghubInstanceNameBase" argument

  2. def createStream(ssc: StreamingContext, mysqlHost: String, mysqlPort: Int, mysqlDatabase: String, mysqlUser: String, mysqlPwd: String, mysqlWorkerInstanceTableName: String, mysqlShardLeaseTableName: String, logServiceProject: String, logStoreName: String, loghubConsumerGroupName: String, loghubInstanceNameBase: String, loghubEndpoint: String, accessKeyId: String, accessKeySecret: String, storageLevel: StorageLevel): ReceiverInputDStream[Array[Byte]]

    Create loghub DStream.

    Create loghub DStream.

    val dbHost = "localhost"
    val dbPort = "3306"
    val db = "sample-db"
    val dbUser = "tom"
    var pwd = "123456"
    val instanceWorker = "loghub_worker"
    val lease = "loghub_lease"
    val loghubProject = "sample-project"
    val logStream = "sample-logstore"
    val loghubGroupName = "sample-group"
    val instanceNameBase = "sample-namebase"
    val endpoint = "cn-hangzhou-intranet.sls.aliyuncs.com"
    val accessKeyId = "kj7aY*******UYx6"
    val accessKeySecret = "AiNMAlxz*************1PxaPaL8t"
    val batchInterval = Milliseconds(5 * 1000)
    
    val conf = new SparkConf().setAppName("Test Loghub")
    val ssc = new StreamingContext(conf, batchInterval)
    val loghubStream = LoghubUtils.createStream(
      ssc,
      dbHost,
      dbPort,
      db,
      dbUser,
      pwd,
      instanceWorker,
      lease,
      loghubProject,
      logStream,
      loghubGroupName,
      instanceNameBase,
      endpoint,
      accessKeyId,
      accessKeySecret,
      StorageLevel.MEMORY_AND_DISK)
    ssc

    StreamingContext.

    mysqlHost

    The host of Mysql needed by loghub.

    mysqlPort

    The port of Mysql needed by loghub.

    mysqlDatabase

    The name of mysql database.

    mysqlUser

    The username of mysql.

    mysqlPwd

    The password of mysql.

    mysqlWorkerInstanceTableName

    The name of WorkInstance table, such as "loghub_worker" above.

    mysqlShardLeaseTableName

    The name of ShardLease table, such as "loghub_lease" above.

    logServiceProject

    The name of LogService project

    logStoreName

    The name of logStore.

    loghubConsumerGroupName

    The group name of loghub consumer. All consumer process which has the same group name will consumer specific logStore together.

    loghubInstanceNameBase

    The name base of each loghub instance.

    loghubEndpoint

    The endpoint of loghub.

    accessKeyId

    The Aliyun AccessKeyId.

    accessKeySecret

    The Aliyun AccessKeySecret.

    storageLevel

    The storage level.

    returns

    Annotations
    @Experimental() @deprecated
    Deprecated

    (Since version 1.0.5) No need to provide "loghubInstanceNameBase" argument

Inherited from AnyRef

Inherited from Any

Ungrouped