scala - spark streaming fileStream -


i'm programming spark streaming have trouble scala. i'm trying use function streamingcontext.filestream

the definition of function this:

def filestream[k, v, f <: inputformat[k, v]](directory: string)(implicit arg0: classmanifest[k], arg1: classmanifest[v], arg2: classmanifest[f]): dstream[(k, v)] 

create input stream monitors hadoop-compatible filesystem new files , reads them using given key-value types , input format. file names starting . ignored. k key type reading hdfs file v value type reading hdfs file f input format reading hdfs file directory hdfs directory monitor new file

i don't know how pass type of key , value. code in spark streaming:

val ssc = new streamingcontext(args(0), "streamingreceiver", seconds(1),   system.getenv("spark_home"), seq("/home/mesos/streamingreceiver.jar"))  // create networkinputdstream on target ip:port , count val lines = ssc.filestream("/home/sequencefile") 

java code write hadoop file:

public class mydriver {  private static final string[] data = { "one, two, buckle shoe",         "three, four, shut door", "five, six, pick sticks",         "seven, eight, lay them straight", "nine, ten, big fat hen" };  public static void main(string[] args) throws ioexception {     string uri = args[0];     configuration conf = new configuration();     filesystem fs = filesystem.get(uri.create(uri), conf);     path path = new path(uri);     intwritable key = new intwritable();     text value = new text();     sequencefile.writer writer = null;     try {         writer = sequencefile.createwriter(fs, conf, path, key.getclass(),                 value.getclass());         (int = 0; < 100; i++) {             key.set(100 - i);             value.set(data[i % data.length]);             system.out.printf("[%s]\t%s\t%s\n", writer.getlength(), key,                     value);             writer.append(key, value);         }     } {         ioutils.closestream(writer);     } } 

}

if want use filestream, you're going have supply 3 type params when calling it. need know key, value , inputformat types before calling it. if types longwritable, text , textinputformat, call filestream so:

val lines = ssc.filestream[longwritable, text, textinputformat]("/home/sequencefile") 

if 3 types happen types, might want use textfilestream instead not require type params , delegates filestream using 3 types mentioned. using this:

val lines = ssc.textfilestream("/home/sequencefile") 

Comments

Popular posts from this blog

Change php variable from jquery value using ajax (same page) -

Pull out data related to my apps from Android Play Store and iOS App Store -

How can I fetch data from a web server in an android application? -