How to create Data frame from csv in Spark(using scala) when the first line is the schema? -


i new spark , coding using scala. want read file hdfs or s3 , convert spark data frame. first line of csv file schema. how can create dataframe schema having unknown columns? using following piece of code create dataframe known schema.

def loaddata(path:string): dataframe = {    val rdd = sc.textfile(path);   val firstline = rdd.first();   val schema = structtype(firstline.split(',').map(fieldname=>structfield(fieldname,stringtype,true)));    val noheader = rdd.mappartitionswithindex(      (i, iterator) =>        if (i == 0 && iterator.hasnext) {           iterator.next           iterator          } else iterator)     val rowrdd = noheader.map(_.split(",")).map(p => row(p(0), p(1), p(2), p(3), p(4),p(5)))    val dataframe = sqlcontext.createdataframe(rowrdd, schema);   return dataframe; 

}

you can try following code dear hammad

val sc = new sparkcontext(new sparkconf().setmaster("local").setappname("test")) val sqlcon = new sqlcontext(sc) //comma separated list of columnname:type  def main(args:array[string]){ var schemastring ="id:int,firstname:text,lastname:text,email:string,country:text" val schema =       structtype(         schemastring.split(",").map(fieldname => structfield(fieldname.split(":")(0),           getfieldtypeinschema(fieldname.split(":")(1)), true))) val rdd=sc.textfile("/users.csv") val noheader = rdd.mappartitionswithindex(  (i, iterator) =>    if (i == 0 && iterator.hasnext) {       iterator.next       iterator      } else iterator)  val rowrddx =noheader.map(p => {       var list: collection.mutable.seq[any] = collection.mutable.seq.empty[any]       var index = 0       var tokens = p.split(",")       tokens.foreach(value => {         var valtype = schema.fields(index).datatype         var returnval: = null         valtype match {           case integertype => returnval = value.tostring.toint           case doubletype => returnval = value.tostring.todouble           case longtype => returnval = value.tostring.tolong           case floattype => returnval = value.tostring.tofloat           case bytetype => returnval = value.tostring.tobyte           case stringtype => returnval = value.tostring           case timestamptype => returnval = value.tostring         }         list = list :+ returnval         index += 1       })       row.fromseq(list)     }) val df = sqlcon.applyschema(rowrddx, schema) } def getfieldtypeinschema(ftype: string): datatype = {      ftype match {       case "int" => return integertype       case "double" => return doubletype       case "long" => return longtype       case "float" => return floattype       case "byte" => return bytetype       case "string" => return stringtype       case "date" => return timestamptype       case "timestamp" => return stringtype       case "uuid" => return stringtype       case "decimal" => return doubletype       case "boolean" => booleantype       case "counter" => integertype       case "bigint" => integertype       case "text" => return stringtype       case "ascii" => return stringtype       case "varchar" => return stringtype       case "varint" => return integertype       case default => return stringtype     }   } 

hope u. :)


Comments

Popular posts from this blog

javascript - Using jquery append to add option values into a select element not working -

Android soft keyboard reverts to default keyboard on orientation change -

Rendering JButton to get the JCheckBox behavior in a JTable by using images does not update my table -