How-To Guides
...
Databricks Integration
Streaming with Kafka & GCP Pub...
Kafka Integration
9min
to configure the kafka integration, complete the following steps step 1 add device follow the steps to connect a device docid\ ish7bqhzxswtdx8vbnszb the device will be used to store tags that will be eventually used to create outbound topics in the connector make sure to select the enable data store checkbox step 2 add tags after connecting the device in litmus edge, you can add tags docid\ xgwokqbtpevii7or82ll0 to the device create tags that you want to use to create outbound topics for the connector step 3 add kafka connector to add the kafka connector in litmus edge navigate to integration click add a connector icon the add a connector dialog box appears select the kafka ssl provider from the drop down menu in this guide, we are using confluent kafka as the kafka broker, so we selected kafka ssl integration complete the following information for the kafka ssl connector as shown in the screenshot and click add/update click the kafka ssl connector tile the connector dashboard appears click the topics tab click the import from devicehub tags icon and select all the tags to send data to databricks step 4 set up databricks for kafka streaming in databricks, you can configure kafka connector parameters from a python notebook file you will need to input information about the broker address, topic name, and api key and secret to set up databricks for kafka streaming 1\ navigate to databricks workspace using your log in credentials 2\ create a new notebook in databricks follow the databricks kafka streaming guide https //docs databricks com/en/connect/streaming/kafka html to set up kafka streaming 3\ once the data is loaded, it will be in the following format the payload sent by le will be in the value field in binary format example notebook for kafka streams note the user can create their own notebooks and customize them as needed this example notebook will explain how to ingest the standard devicehub json payload open a new notebook and follow the steps 1\ import the necessary libraries and dependencies to handle kafka data streams and process data using pyspark import pyspark sql types as t import pyspark sql functions as f from pyspark sql types import structtype, structfield, stringtype, integertype, doubletype, booleantype, timestamptype, longtype from pyspark sql functions import from unixtime, from json, col, cast 2\ define the credentials and kafka broker information that will be used to authenticate and connect to the kafka server note end users should use secure methods of passing their kafka credentials confluentbootstrapserver = "\<kafka broker server>" kafkatopic = "\<kafka topic>" apikey = "\<api key>" apisecret = "\<api secret>" 3\ configure spark to read data streams from the kafka topic note ensure to include the kafka server , security protocol , and authentication details df = spark readstream \\ format("kafka") \\ option("kafka bootstrap servers", confluentbootstrapserver) \\ option("kafka security protocol", "sasl ssl") \\ option("kafka sasl mechanism", "plain") \\ option("kafka sasl jaas config", f'kafkashaded org apache kafka common security plain plainloginmodule required username="{apikey}" password="{apisecret}";') \\ option("subscribe", kafkatopic) \\ option("startingoffsets", "earliest") \\ load() this will output the following note this guide covers the devicehub json payload please update the notebook if you are sending a different payload to databricks 4\ define the schema that matches the expected structure of the incoming data and parse the kafka stream data accordingly \# define the schema schema = structtype(\[ structfield("devicename", stringtype()), structfield("tagname", stringtype()), structfield("deviceid", stringtype()), structfield("success", booleantype()), structfield("datatype", stringtype()), structfield("timestamp", longtype()), structfield("value", doubletype()), structfield("metadata", stringtype()), structfield("registerid", stringtype()), structfield("description", stringtype()) ]) \# convert the binary 'value' column to string and then parse the json data parsed df = (df withcolumnsrenamed({'timestamp' 'kafka time'}) withcolumn("parsed value", from json(col("value") cast("string"), schema)) withcolumn("le timestamp", from unixtime(f col('parsed value timestamp')/1000, "yyyy mm dd hh\ mm\ ss sss")) select("key","topic", "kafka time","partition","offset","parsed value ","le timestamp", ) ) this will output the following 5\ specify the databricks table where the parsed data will be stored table = "\<databricks table name>" query = (parsed df writestream format("delta") outputmode("append") option("checkpointlocation", "/tmp/delta/"+ table + "/ checkpoints/") totable(table) ) 6\ once the data is published to the databricks table, you can query the table to verify that the data ingestion process is successful