Skip to main content
Version: 1.1.0

kafkaload

Synopsis

starlake kafkaload [options]

Description

Two modes are available : The batch mode and the streaming mode.

Batch mode

In batch mode, you start the kafka (off)loader regurarly and the last consumed offset will be stored in the comet_offsets topic config (see reference-kafka.conf for an example).

When offloading data from kafka to a file, you may ask to coalesce the result to a specific number of files / partitions. If you ask to coalesce to a single partition, the offloader will store the data in the exact filename you provided in the path argument.

The figure below describes the batch offloading process

The figure below describes the batch offloading process with comet-offsets-mode = "FILE"

Streaming mode

In this mode, te program keep running and you the comet_offsets topic is not used. The (off)loader will use a consumer group id you specify in the access options of the topic configuration you are dealing with.

Parameters

ParameterCardinalityDescription
--config:<value>OptionalTopic Name declared in reference.conf file
--connectionRef:<value>OptionalConnection to any specific sink
--format:<value>OptionalRead/Write format eq : parquet, json, csv ... Default to parquet.
--path:<value>OptionalSource file for load and target file for store
--options:<value>OptionalOptions to pass to Spark Reader
--write-config:<value>OptionalTopic Name declared in reference.conf file
--write-path:<value>OptionalSource file for load and target file for store
--write-mode:<value>OptionalWhen offload is true, describes how data should be stored on disk. Ignored if offload is false.
--write-options:<value>OptionalOptions to pass to Spark Writer
--write-format:<value>OptionalStreaming format eq. kafka, console ...
--write-coalesce:<value>OptionalShould we coalesce the resulting dataframe
--transform:<value>OptionalAny transformation to apply to message before loading / offloading it
--stream:<value>OptionalShould we use streaming mode ?
--streaming-trigger:<value>OptionalOnce / Continuous / ProcessingTime
--streaming-trigger-option:<value>Optional10 seconds for example. see https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/Trigger.html#ProcessingTime-java.lang.String-
--streaming-to-table:<value>OptionalTable name to sink to
--streaming-partition-by:<value>OptionalList of columns to use for partitioning

Samples

Batch offload topic from kafka to a file

Assume we want to periodically offload an avro topic to the disk and create a filename with the date time, the batch was started. We need to provide the following input parameters to the starlake batch:

  • offload: We set it to true since we are consuming data from kafka
  • mode: Overwrite since we are creating a unique file for each starlake batch
  • path: the file path where the consumed data will be stored. We can use here any standard starlake variable, for example /tmp/file-{{sl_datetime}}.txt
  • format: We may save it in any spark supported format (parquet, text, json ...)
  • coalesce: Write all consumed messages into a single file if set to 1
  • config: The config entry on the application.conf describing the topic connections options. See below.