scala - spark streaming fileStream -
i'm programming spark streaming have trouble scala. i'm trying use function streamingcontext.filestream
the definition of function this:
def filestream[k, v, f <: inputformat[k, v]](directory: string)(implicit arg0: classmanifest[k], arg1: classmanifest[v], arg2: classmanifest[f]): dstream[(k, v)] create input stream monitors hadoop-compatible filesystem new files , reads them using given key-value types , input format. file names starting . ignored. k key type reading hdfs file v value type reading hdfs file f input format reading hdfs file directory hdfs directory monitor new file
i don't know how pass type of key , value. code in spark streaming:
val ssc = new streamingcontext(args(0), "streamingreceiver", seconds(1), system.getenv("spark_home"), seq("/home/mesos/streamingreceiver.jar")) // create networkinputdstream on target ip:port , count val lines = ssc.filestream("/home/sequencefile") java code write hadoop file:
public class mydriver { private static final string[] data = { "one, two, buckle shoe", "three, four, shut door", "five, six, pick sticks", "seven, eight, lay them straight", "nine, ten, big fat hen" }; public static void main(string[] args) throws ioexception { string uri = args[0]; configuration conf = new configuration(); filesystem fs = filesystem.get(uri.create(uri), conf); path path = new path(uri); intwritable key = new intwritable(); text value = new text(); sequencefile.writer writer = null; try { writer = sequencefile.createwriter(fs, conf, path, key.getclass(), value.getclass()); (int = 0; < 100; i++) { key.set(100 - i); value.set(data[i % data.length]); system.out.printf("[%s]\t%s\t%s\n", writer.getlength(), key, value); writer.append(key, value); } } { ioutils.closestream(writer); } } }
if want use filestream, you're going have supply 3 type params when calling it. need know key, value , inputformat types before calling it. if types longwritable, text , textinputformat, call filestream so:
val lines = ssc.filestream[longwritable, text, textinputformat]("/home/sequencefile") if 3 types happen types, might want use textfilestream instead not require type params , delegates filestream using 3 types mentioned. using this:
val lines = ssc.textfilestream("/home/sequencefile")
Comments
Post a Comment