Python – using foreachRDD and foreach to iterate over an rdd in pyspark

apache-sparkpysparkpython

Questions for Spark 1.6.1, pyspark

I have streaming data coming in as like

{"event":4,"Userid":12345,"time":123456789,"device_model":"iPhone OS", "some_other_property": "value", "row_key": 555}

I have a function that writes to HBase called writeToHBase(rdd), expecting an rdd that has tuple in the following structure:

(rowkey, [rowkey, column-family, key, value])

As you can see from the input format, I have to take my original dataset and iterate over all keys, sending each key/value pair with a send function call.

From reading the spark streaming programming guide, section "Design Patterns for using foreachRDD" http://spark.apache.org/docs/latest/streaming-programming-guide.html#tab_python_13

It seems that its recommended to use foreachRDD when doing something external to the dataset. In my case, I want to write data to HBase over the network, so I use foreachRDD on my streaming data and call the function that will handle sending the data:

stream.foreachRDD(lambda k: process(k))

My understanding of spark functions is pretty limited right now, so I'm unable to figure out a way to iterate on my original dataset to use my write function. if it was a python iterable, i'd be able to do this:

def process(rdd):
    for key, value in my_rdd.iteritems():
        writeToHBase(sc.parallelize(rowkey, [rowkey, 'column-family', key, value]))

where rowkey would have be obtained by finding it in the rdd itself

rdd.map(lambda x: x['rowkey'])

How do I accomplish what process() is meant to do in pyspark? I see some examples use foreach, but I'm not quite able to get it to do what I want.

Best Answer

why do you want to iterate over rdd while your writeToHBase function expects a rdd as arguement. Simply call writeToHBase(rdd) in your process function, that's it.

If you need to fetch every record from the rdd you can call

def processRecord(record):
        print(record)   
rdd.foreach(processRecord)

In processRecord function you will get single record to process.

Related Topic