python - Column filtering in PySpark -


i have dataframe df loaded hive table , has timestamp column, ts, string type of format dd-mmm-yy hh.mm.ss.ms a (converted python datetime library, %d-%b-%y %i.%m.%s.%f %p).

now want filter rows dataframe last 5 minutes:

only_last_5_minutes = df.filter(     datetime.strptime(df.ts, '%d-%b-%y %i.%m.%s.%f %p') > datetime.now() - timedelta(minutes=5) ) 

however, not work , message

typeerror: strptime() argument 1 must string, not column 

it looks have wrong application of column operation , seems me have create lambda function filter each column satisfies desired condition, being newbie python , lambda expression in particular, don't know how create filter correct. please advise.

p.s. prefer express filters python native (or sparksql) rather filter inside hive sql query expression 'where'.

preferred:

df = sqlcontext.sql("select * my_table") df.filter( // filter here) 

not preferred:

df = sqlcontext.sql("select * my_table where...") 

it possible use user defined function.

from datetime import datetime, timedelta pyspark.sql.types import booleantype, timestamptype pyspark.sql.functions import udf, col  def in_last_5_minutes(now):     def _in_last_5_minutes(then):         then_parsed = datetime.strptime(then, '%d-%b-%y %i.%m.%s.%f %p')         return then_parsed > - timedelta(minutes=5)     return udf(_in_last_5_minutes, booleantype()) 

using dummy data:

df = sqlcontext.createdataframe([     (1, '14-jul-15 11.34.29.000000 am'),     (2, '14-jul-15 11.34.27.000000 am'),     (3, '14-jul-15 11.32.11.000000 am'),     (4, '14-jul-15 11.29.00.000000 am'),     (5, '14-jul-15 11.28.29.000000 am') ], ('id', 'datetime'))  = datetime(2015, 7, 14, 11, 35) df.where(in_last_5_minutes(now)(col("datetime"))).show() 

and expected 3 entries:

+--+--------------------+ |id|            datetime| +--+--------------------+ | 1|14-jul-15 11.34.2...| | 2|14-jul-15 11.34.2...| | 3|14-jul-15 11.32.1...| +--+--------------------+ 

parsing datetime string on again rather inefficient may consider storing timestamptype instead.

def parse_dt():     def _parse(dt):         return datetime.strptime(dt, '%d-%b-%y %i.%m.%s.%f %p')     return udf(_parse, timestamptype())  df_with_timestamp = df.withcolumn("timestamp", parse_dt()(df.datetime))  def in_last_5_minutes(now):     def _in_last_5_minutes(then):         return > - timedelta(minutes=5)     return udf(_in_last_5_minutes, booleantype())  df_with_timestamp.where(in_last_5_minutes(now)(col("timestamp"))) 

and result:

+--+--------------------+--------------------+ |id|            datetime|           timestamp| +--+--------------------+--------------------+ | 1|14-jul-15 11.34.2...|2015-07-14 11:34:...| | 2|14-jul-15 11.34.2...|2015-07-14 11:34:...| | 3|14-jul-15 11.32.1...|2015-07-14 11:32:...| +--+--------------------+--------------------+ 

finally possible use raw sql query timestamps:

query = """select * df      unix_timestamp(datetime, 'dd-mmm-yy hh.mm.ss.ssssss a') > {0}      """.format(time.mktime((now - timedelta(minutes=5)).timetuple()))  sqlcontext.sql(query) 

same above more efficient parse date strings once.

if column timestamp possible use datetime literals:

from pyspark.sql.functions import lit  df_with_timestamp.where(     df_with_timestamp.timestamp > lit(now - timedelta(minutes=5))) 

edit

since spark 1.5 can parse date string follows:

from pyspark.sql.functions import from_unixtime, unix_timestamp pyspark.sql.types import timestamptype  df.select((from_unixtime(unix_timestamp(     df.datetime, "yy-mmm-dd h.mm.ss.ssssss aa" ))).cast(timestamptype()).alias("datetime")) 

Comments

Popular posts from this blog

javascript - Using jquery append to add option values into a select element not working -

Android soft keyboard reverts to default keyboard on orientation change -

Rendering JButton to get the JCheckBox behavior in a JTable by using images does not update my table -