How to create Data frame from csv in Spark(using scala) when the first line is the schema? -
i new spark , coding using scala. want read file hdfs or s3 , convert spark data frame. first line of csv file schema. how can create dataframe schema having unknown columns? using following piece of code create dataframe known schema.
def loaddata(path:string): dataframe = { val rdd = sc.textfile(path); val firstline = rdd.first(); val schema = structtype(firstline.split(',').map(fieldname=>structfield(fieldname,stringtype,true))); val noheader = rdd.mappartitionswithindex( (i, iterator) => if (i == 0 && iterator.hasnext) { iterator.next iterator } else iterator) val rowrdd = noheader.map(_.split(",")).map(p => row(p(0), p(1), p(2), p(3), p(4),p(5))) val dataframe = sqlcontext.createdataframe(rowrdd, schema); return dataframe;
}
you can try following code dear hammad
val sc = new sparkcontext(new sparkconf().setmaster("local").setappname("test")) val sqlcon = new sqlcontext(sc) //comma separated list of columnname:type def main(args:array[string]){ var schemastring ="id:int,firstname:text,lastname:text,email:string,country:text" val schema = structtype( schemastring.split(",").map(fieldname => structfield(fieldname.split(":")(0), getfieldtypeinschema(fieldname.split(":")(1)), true))) val rdd=sc.textfile("/users.csv") val noheader = rdd.mappartitionswithindex( (i, iterator) => if (i == 0 && iterator.hasnext) { iterator.next iterator } else iterator) val rowrddx =noheader.map(p => { var list: collection.mutable.seq[any] = collection.mutable.seq.empty[any] var index = 0 var tokens = p.split(",") tokens.foreach(value => { var valtype = schema.fields(index).datatype var returnval: = null valtype match { case integertype => returnval = value.tostring.toint case doubletype => returnval = value.tostring.todouble case longtype => returnval = value.tostring.tolong case floattype => returnval = value.tostring.tofloat case bytetype => returnval = value.tostring.tobyte case stringtype => returnval = value.tostring case timestamptype => returnval = value.tostring } list = list :+ returnval index += 1 }) row.fromseq(list) }) val df = sqlcon.applyschema(rowrddx, schema) } def getfieldtypeinschema(ftype: string): datatype = { ftype match { case "int" => return integertype case "double" => return doubletype case "long" => return longtype case "float" => return floattype case "byte" => return bytetype case "string" => return stringtype case "date" => return timestamptype case "timestamp" => return stringtype case "uuid" => return stringtype case "decimal" => return doubletype case "boolean" => booleantype case "counter" => integertype case "bigint" => integertype case "text" => return stringtype case "ascii" => return stringtype case "varchar" => return stringtype case "varint" => return integertype case default => return stringtype } }
hope u. :)
Comments
Post a Comment