How-To Guides
...
Databricks Integration
Streaming with Kafka & GCP Pub...
GCP Pub/Sub Integration
8min
to configure the gcp pub/sub integration, complete the following steps step 1 add device follow the steps to connect a device docid\ ish7bqhzxswtdx8vbnszb the device will be used to store tags that will be eventually used to create outbound topics in the connector make sure to select the enable data store checkbox step 2 add tags after connecting the device in litmus edge, you can add tags docid\ xgwokqbtpevii7or82ll0 to the device create tags that you want to use to create outbound topics for the connector step 3 add google cloud pub/sub connector to add the kafka connector in litmus edge navigate to integration click add a connector icon the add a connector dialog box appears select google cloud pub/sub provider from the drop down list complete the following information for the google cloud pub/sub connector as shown in the screenshot and click update click the google cloud pub/sub connector tile the connector dashboard appears click the topics tab click the import from devicehub tags icon and select all the tags to send data to databricks step 4 set up databricks for gcp pub/sub streaming in databricks, you can configure gcp pub/sub connector parameters from a python notebook file you need to authenticate using service account credentials and specify pub/sub topic to set up databricks for gcp pub/sub streaming 1\ navigate to databricks workspace using your log in credentials 2\ create a new notebook in databricks follow the databricks pub/sub streaming guide https //docs databricks com/en/connect/streaming/pub sub html to set up google pub/sub streaming 3\ once the data is loaded, it will be in the following format the payload sent by le will be in the payload field in binary format example notebook for gcp pub/sub streams note the user can create their own notebooks and customize them as needed this example notebook will explain how to ingest the standard devicehub json payload open a new notebook and follow the steps 1\ import the necessary libraries and dependencies to handle pub/sub data streams and process data using pyspark import pyspark sql types as t import pyspark sql functions as f from pyspark sql types import structtype, structfield, stringtype, integertype, doubletype, booleantype, timestamptype, longtype from pyspark sql functions import from unixtime 2\ define the credentials and gcp pub/sub information that will be used to authenticate and connect to the pub/sub service note end users should use secure methods of passing their pub/sub credentials \# for both topic and subscription topic, you only need to pass the topic and sub topic name not the full path \# example = databricks test \# not the full path in gcp pubsub, which is "projects/litmus sales enablement/topics/databricks test" authoptions = {"\<gcp pubsub sa key>"} pubsub topic = "\<gcp pubsub topic>" pubsub subscription = "\<gcp pubsub sub topic>" projectid = "\<gcp project id>" 3\ configure spark to read data streams from the pub/sub topic note ensure to include the subscription id, topic id, project id, and authentication options df = (spark readstream format("pubsub") option("subscriptionid", pubsub subscription) option("topicid", pubsub topic) option("projectid", projectid) options( authoptions) load() ) this will output the following note this guide covers the devicehub json payload please update the notebook if you are sending a different payload to databricks 4\ define the schema that matches the expected structure of the incoming data and parse the pub/sub stream data accordingly this involves decoding the payload, extracting attributes, and structuring the parsed data into a dataframe schema = structtype(\[ structfield("devicename", stringtype(), true), structfield("tagname", stringtype(), true), structfield("deviceid", stringtype(), true), structfield("success", booleantype(), true), structfield("datatype", stringtype(), true), structfield("timestamp", longtype(), true), structfield("value", doubletype(), true), structfield("metadata", stringtype(), true), structfield("registerid", stringtype(), true), structfield("description", stringtype(), true) ]) table = "\<databricks table name>" query = (df withcolumn("payload", f decode(f col("payload"), "utf 8")) withcolumn("attributes", f from json(f col("attributes"), t maptype(t stringtype(), t stringtype()))) withcolumn('parsed value', f from json(f col('payload') cast('string'), schema)) withcolumn("le timestamp", from unixtime(f col('parsed value timestamp')/1000, "yyyy mm dd hh\ mm\ ss sss")) select("messageid","attributes","publishtimestampinmillis",'parsed value ', "le timestamp") writestream format("delta") outputmode("append") option("checkpointlocation", "/tmp/delta/"+ table + "/ checkpoints/") totable(table) ) 5\ once the data is published to the databricks table, you can query the table to verify that the data ingestion process is successful