python - Column filtering in PySpark -
i have dataframe df
loaded hive table , has timestamp column, ts
, string type of format dd-mmm-yy hh.mm.ss.ms a
(converted python datetime library, %d-%b-%y %i.%m.%s.%f %p
).
now want filter rows dataframe last 5 minutes:
only_last_5_minutes = df.filter( datetime.strptime(df.ts, '%d-%b-%y %i.%m.%s.%f %p') > datetime.now() - timedelta(minutes=5) )
however, not work , message
typeerror: strptime() argument 1 must string, not column
it looks have wrong application of column operation , seems me have create lambda function filter each column satisfies desired condition, being newbie python , lambda expression in particular, don't know how create filter correct. please advise.
p.s. prefer express filters python native (or sparksql) rather filter inside hive sql query expression 'where'.
preferred:
df = sqlcontext.sql("select * my_table") df.filter( // filter here)
not preferred:
df = sqlcontext.sql("select * my_table where...")
it possible use user defined function.
from datetime import datetime, timedelta pyspark.sql.types import booleantype, timestamptype pyspark.sql.functions import udf, col def in_last_5_minutes(now): def _in_last_5_minutes(then): then_parsed = datetime.strptime(then, '%d-%b-%y %i.%m.%s.%f %p') return then_parsed > - timedelta(minutes=5) return udf(_in_last_5_minutes, booleantype())
using dummy data:
df = sqlcontext.createdataframe([ (1, '14-jul-15 11.34.29.000000 am'), (2, '14-jul-15 11.34.27.000000 am'), (3, '14-jul-15 11.32.11.000000 am'), (4, '14-jul-15 11.29.00.000000 am'), (5, '14-jul-15 11.28.29.000000 am') ], ('id', 'datetime')) = datetime(2015, 7, 14, 11, 35) df.where(in_last_5_minutes(now)(col("datetime"))).show()
and expected 3 entries:
+--+--------------------+ |id| datetime| +--+--------------------+ | 1|14-jul-15 11.34.2...| | 2|14-jul-15 11.34.2...| | 3|14-jul-15 11.32.1...| +--+--------------------+
parsing datetime string on again rather inefficient may consider storing timestamptype
instead.
def parse_dt(): def _parse(dt): return datetime.strptime(dt, '%d-%b-%y %i.%m.%s.%f %p') return udf(_parse, timestamptype()) df_with_timestamp = df.withcolumn("timestamp", parse_dt()(df.datetime)) def in_last_5_minutes(now): def _in_last_5_minutes(then): return > - timedelta(minutes=5) return udf(_in_last_5_minutes, booleantype()) df_with_timestamp.where(in_last_5_minutes(now)(col("timestamp")))
and result:
+--+--------------------+--------------------+ |id| datetime| timestamp| +--+--------------------+--------------------+ | 1|14-jul-15 11.34.2...|2015-07-14 11:34:...| | 2|14-jul-15 11.34.2...|2015-07-14 11:34:...| | 3|14-jul-15 11.32.1...|2015-07-14 11:32:...| +--+--------------------+--------------------+
finally possible use raw sql query timestamps:
query = """select * df unix_timestamp(datetime, 'dd-mmm-yy hh.mm.ss.ssssss a') > {0} """.format(time.mktime((now - timedelta(minutes=5)).timetuple())) sqlcontext.sql(query)
same above more efficient parse date strings once.
if column timestamp
possible use datetime
literals:
from pyspark.sql.functions import lit df_with_timestamp.where( df_with_timestamp.timestamp > lit(now - timedelta(minutes=5)))
edit
since spark 1.5 can parse date string follows:
from pyspark.sql.functions import from_unixtime, unix_timestamp pyspark.sql.types import timestamptype df.select((from_unixtime(unix_timestamp( df.datetime, "yy-mmm-dd h.mm.ss.ssssss aa" ))).cast(timestamptype()).alias("datetime"))
Comments
Post a Comment