#!/usr/bin/env python # coding: utf-8 # Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import _pickle as cPickle import argparse import datetime import hashlib import math import numpy as np import pandas as pd import pyspark.sql.functions as F import time from pyspark.context import SparkContext, SparkConf from pyspark.ml.linalg import SparseVector, VectorUDT from pyspark.sql.session import SparkSession from pyspark.sql.types import IntegerType, StringType, StructType, StructField, TimestampType, FloatType, ArrayType evaluation_verbose = False OUTPUT_BUCKET_FOLDER = "/outbrain/preprocessed/" DATA_BUCKET_FOLDER = "/outbrain/orig/" SPARK_TEMP_FOLDER = "/outbrain/spark-temp/" conf = SparkConf().setMaster('local[*]').set('spark.executor.memory', '40g').set('spark.driver.memory', '200g').set( "spark.local.dir", SPARK_TEMP_FOLDER) sc = SparkContext(conf=conf) spark = SparkSession(sc) start_time = time.time() def hashstr(s, nr_bins): return int(hashlib.md5(s.encode('utf8')).hexdigest(), 16) % (nr_bins - 1) + 1 parser = argparse.ArgumentParser() parser.add_argument( '--submission', action='store_true', default=False ) args = parser.parse_args() evaluation = not args.submission # ## UDFs def date_time_to_unix_epoch(date_time): return int(time.mktime(date_time.timetuple())) def date_time_to_unix_epoch_treated(dt): if dt is not None: try: epoch = date_time_to_unix_epoch(dt) return epoch except Exception as e: print("Error processing dt={}".format(dt), e) return 0 else: return 0 timestamp_null_to_zero_int_udf = F.udf(lambda x: date_time_to_unix_epoch_treated(x), IntegerType()) INT_DEFAULT_NULL_VALUE = -1 int_null_to_minus_one_udf = F.udf(lambda x: x if x is not None else INT_DEFAULT_NULL_VALUE, IntegerType()) int_list_null_to_empty_list_udf = F.udf(lambda x: x if x is not None else [], ArrayType(IntegerType())) float_list_null_to_empty_list_udf = F.udf(lambda x: x if x is not None else [], ArrayType(FloatType())) str_list_null_to_empty_list_udf = F.udf(lambda x: x if x is not None else [], ArrayType(StringType())) def truncate_day_from_timestamp(ts): return int(ts / 1000 / 60 / 60 / 24) truncate_day_from_timestamp_udf = F.udf(lambda ts: truncate_day_from_timestamp(ts), IntegerType()) extract_country_udf = F.udf(lambda geo: geo.strip()[:2] if geo is not None else '', StringType()) extract_country_state_udf = F.udf(lambda geo: geo.strip()[:5] if geo is not None else '', StringType()) list_len_udf = F.udf(lambda x: len(x) if x is not None else 0, IntegerType()) def convert_odd_timestamp(timestamp_ms_relative): TIMESTAMP_DELTA = 1465876799998 return datetime.datetime.fromtimestamp((int(timestamp_ms_relative) + TIMESTAMP_DELTA) // 1000) # # Loading Files # ## Loading UTC/BST for each country and US / CA states (local time) country_utc_dst_df = pd.read_csv('preproc/data/country_codes_utc_dst_tz_delta.csv', keep_default_na=False) countries_utc_dst_dict = dict( zip(country_utc_dst_df['country_code'].tolist(), country_utc_dst_df['utc_dst_time_offset_cleaned'].tolist())) countries_utc_dst_broad = sc.broadcast(countries_utc_dst_dict) us_states_utc_dst_df = pd.read_csv('preproc/data/us_states_abbrev_bst.csv', keep_default_na=False) us_states_utc_dst_dict = dict( zip(us_states_utc_dst_df['state_abb'].tolist(), us_states_utc_dst_df['utc_dst_time_offset_cleaned'].tolist())) us_states_utc_dst_broad = sc.broadcast(us_states_utc_dst_dict) ca_states_utc_dst_df = pd.read_csv('preproc/data/ca_states_abbrev_bst.csv', keep_default_na=False) ca_countries_utc_dst_dict = dict( zip(ca_states_utc_dst_df['state_abb'].tolist(), ca_states_utc_dst_df['utc_dst_time_offset_cleaned'].tolist())) ca_countries_utc_dst_broad = sc.broadcast(ca_countries_utc_dst_dict) # ## Loading competition csvs events_schema = StructType( [StructField("display_id", IntegerType(), True), StructField("uuid_event", StringType(), True), StructField("document_id_event", IntegerType(), True), StructField("timestamp_event", IntegerType(), True), StructField("platform_event", IntegerType(), True), StructField("geo_location_event", StringType(), True)] ) events_df = spark.read.schema(events_schema) \ .options(header='true', inferschema='false', nullValue='\\N') \ .csv(DATA_BUCKET_FOLDER + "events.csv") \ .withColumn('dummyEvents', F.lit(1)) \ .withColumn('day_event', truncate_day_from_timestamp_udf('timestamp_event')) \ .withColumn('event_country', extract_country_udf('geo_location_event')) \ .withColumn('event_country_state', extract_country_state_udf('geo_location_event')) \ .alias('events') events_df.count() # Drop rows with empty "geo_location" events_df = events_df.dropna(subset="geo_location_event") events_df.count() # Drop rows with empty "platform" events_df = events_df.dropna(subset="platform_event") events_df.count() page_views_schema = StructType( [StructField("uuid_pv", StringType(), True), StructField("document_id_pv", IntegerType(), True), StructField("timestamp_pv", IntegerType(), True), StructField("platform_pv", IntegerType(), True), StructField("geo_location_pv", StringType(), True), StructField("traffic_source_pv", IntegerType(), True)] ) page_views_df = spark.read.schema(page_views_schema) \ .options(header='true', inferschema='false', nullValue='\\N') \ .csv(DATA_BUCKET_FOLDER + "page_views.csv") \ .withColumn('day_pv', truncate_day_from_timestamp_udf('timestamp_pv')) \ .alias('page_views') page_views_df.createOrReplaceTempView('page_views') page_views_users_df = spark.sql(''' SELECT uuid_pv, document_id_pv, max(timestamp_pv) as max_timestamp_pv, 1 as dummyPageView FROM page_views p GROUP BY uuid_pv, document_id_pv ''').alias('page_views_users') promoted_content_schema = StructType( [StructField("ad_id", IntegerType(), True), StructField("document_id_promo", IntegerType(), True), StructField("campaign_id", IntegerType(), True), StructField("advertiser_id", IntegerType(), True)] ) promoted_content_df = spark.read.schema(promoted_content_schema) \ .options(header='true', inferschema='false', nullValue='\\N') \ .csv(DATA_BUCKET_FOLDER + "promoted_content.csv") \ .withColumn('dummyPromotedContent', F.lit(1)).alias('promoted_content').cache() documents_meta_schema = StructType( [StructField("document_id_doc", IntegerType(), True), StructField("source_id", IntegerType(), True), StructField("publisher_id", IntegerType(), True), StructField("publish_time", TimestampType(), True)] ) documents_meta_df = spark.read.schema(documents_meta_schema) \ .options(header='true', inferschema='false', nullValue='\\N') \ .csv(DATA_BUCKET_FOLDER + "documents_meta.csv") \ .withColumn('dummyDocumentsMeta', F.lit(1)).alias('documents_meta').cache() documents_meta_df.count() # Drop rows with empty "source_id" documents_meta_df = documents_meta_df.dropna(subset="source_id") documents_meta_df.count() source_publishers_df = documents_meta_df.select(["source_id", "publisher_id"]).dropDuplicates() source_publishers_df.count() # get list of source_ids without publisher_id rows_no_pub = source_publishers_df.filter("publisher_id is NULL") source_ids_without_publisher = [row['source_id'] for row in rows_no_pub.collect()] len(source_ids_without_publisher) # maximum value of publisher_id used so far max_pub = max(source_publishers_df.select(["publisher_id"]).dropna().collect())['publisher_id'] max_pub # rows filled with new publisher_ids new_publishers = [(source, max_pub + 1 + nr) for nr, source in enumerate(source_ids_without_publisher)] new_publishers_df = spark.createDataFrame(new_publishers, ("source_id", "publisher_id")) new_publishers_df.take(10) # old and new publishers merged fixed_source_publishers_df = source_publishers_df.dropna().union(new_publishers_df) fixed_source_publishers_df.collect()[-30:] # update documents_meta with bew publishers documents_meta_df = documents_meta_df.drop('publisher_id').join(fixed_source_publishers_df, on='source_id') documents_meta_df.count() # Joining with Page Views to get traffic_source_pv events_joined_df = events_df.join(documents_meta_df .withColumnRenamed('source_id', 'source_id_doc_event') .withColumnRenamed('publisher_id', 'publisher_doc_event') .withColumnRenamed('publish_time', 'publish_time_doc_event'), on=F.col("document_id_event") == F.col("document_id_doc"), how='left') \ .join(page_views_df, on=[F.col('uuid_event') == F.col('uuid_pv'), F.col('document_id_event') == F.col('document_id_pv'), F.col('platform_event') == F.col('platform_pv'), F.col('geo_location_event') == F.col('geo_location_pv'), F.col('day_event') == F.col('day_pv')], how='left') \ .alias('events').cache() documents_categories_schema = StructType( [StructField("document_id_cat", IntegerType(), True), StructField("category_id", IntegerType(), True), StructField("confidence_level_cat", FloatType(), True)] ) documents_categories_df = spark.read.schema(documents_categories_schema) \ .options(header='true', inferschema='false', nullValue='\\N') \ .csv(DATA_BUCKET_FOLDER + "documents_categories.csv") \ .alias('documents_categories').cache() documents_categories_grouped_df = documents_categories_df.groupBy('document_id_cat') \ .agg(F.collect_list('category_id').alias('category_id_list'), F.collect_list('confidence_level_cat').alias('confidence_level_cat_list')) \ .withColumn('dummyDocumentsCategory', F.lit(1)) \ .alias('documents_categories_grouped') documents_topics_schema = StructType( [StructField("document_id_top", IntegerType(), True), StructField("topic_id", IntegerType(), True), StructField("confidence_level_top", FloatType(), True)] ) documents_topics_df = spark.read.schema(documents_topics_schema) \ .options(header='true', inferschema='false', nullValue='\\N') \ .csv(DATA_BUCKET_FOLDER + "documents_topics.csv") \ .alias('documents_topics').cache() documents_topics_grouped_df = documents_topics_df.groupBy('document_id_top') \ .agg(F.collect_list('topic_id').alias('topic_id_list'), F.collect_list('confidence_level_top').alias('confidence_level_top_list')) \ .withColumn('dummyDocumentsTopics', F.lit(1)) \ .alias('documents_topics_grouped') documents_entities_schema = StructType( [StructField("document_id_ent", IntegerType(), True), StructField("entity_id", StringType(), True), StructField("confidence_level_ent", FloatType(), True)] ) documents_entities_df = spark.read.schema(documents_entities_schema) \ .options(header='true', inferschema='false', nullValue='\\N') \ .csv(DATA_BUCKET_FOLDER + "documents_entities.csv") \ .alias('documents_entities').cache() documents_entities_grouped_df = documents_entities_df.groupBy('document_id_ent') \ .agg(F.collect_list('entity_id').alias('entity_id_list'), F.collect_list('confidence_level_ent').alias('confidence_level_ent_list')) \ .withColumn('dummyDocumentsEntities', F.lit(1)) \ .alias('documents_entities_grouped') clicks_train_schema = StructType( [StructField("display_id", IntegerType(), True), StructField("ad_id", IntegerType(), True), StructField("clicked", IntegerType(), True)] ) clicks_train_df = spark.read.schema(clicks_train_schema) \ .options(header='true', inferschema='false', nullValue='\\N') \ .csv(DATA_BUCKET_FOLDER + "clicks_train.csv") \ .withColumn('dummyClicksTrain', F.lit(1)).alias('clicks_train') clicks_train_joined_df = clicks_train_df \ .join(promoted_content_df, on='ad_id', how='left') \ .join(documents_meta_df, on=F.col("promoted_content.document_id_promo") == F.col("documents_meta.document_id_doc"), how='left') \ .join(events_joined_df, on='display_id', how='left') clicks_train_joined_df.createOrReplaceTempView('clicks_train_joined') if evaluation: table_name = 'user_profiles_eval' else: table_name = 'user_profiles' user_profiles_df = spark.read.parquet(OUTPUT_BUCKET_FOLDER + table_name) \ .withColumn('dummyUserProfiles', F.lit(1)).alias('user_profiles') # # Spliting Train/validation set | Test set if evaluation: validation_set_exported_df = spark.read.parquet( OUTPUT_BUCKET_FOLDER + "validation_set.parquet") \ .alias('validation_set') validation_set_exported_df.select('display_id').distinct() \ .createOrReplaceTempView("validation_display_ids") validation_set_df = spark.sql(''' SELECT * FROM clicks_train_joined t WHERE EXISTS (SELECT display_id FROM validation_display_ids WHERE display_id = t.display_id)''').alias('clicks') \ .join(documents_categories_grouped_df, on=F.col("document_id_promo") == F.col("documents_categories_grouped.document_id_cat"), how='left') \ .join(documents_topics_grouped_df, on=F.col("document_id_promo") == F.col("documents_topics_grouped.document_id_top"), how='left') \ .join(documents_entities_grouped_df, on=F.col("document_id_promo") == F.col("documents_entities_grouped.document_id_ent"), how='left') \ .join(documents_categories_grouped_df .withColumnRenamed('category_id_list', 'doc_event_category_id_list') .withColumnRenamed('confidence_level_cat_list', 'doc_event_confidence_level_cat_list') .alias('documents_event_categories_grouped'), on=F.col("document_id_event") == F.col("documents_event_categories_grouped.document_id_cat"), how='left') \ .join(documents_topics_grouped_df .withColumnRenamed('topic_id_list', 'doc_event_topic_id_list') .withColumnRenamed('confidence_level_top_list', 'doc_event_confidence_level_top_list') .alias('documents_event_topics_grouped'), on=F.col("document_id_event") == F.col("documents_event_topics_grouped.document_id_top"), how='left') \ .join(documents_entities_grouped_df .withColumnRenamed('entity_id_list', 'doc_event_entity_id_list') .withColumnRenamed('confidence_level_ent_list', 'doc_event_confidence_level_ent_list') .alias('documents_event_entities_grouped'), on=F.col("document_id_event") == F.col("documents_event_entities_grouped.document_id_ent"), how='left') \ .join(page_views_users_df, on=[F.col("clicks.uuid_event") == F.col("page_views_users.uuid_pv"), F.col("clicks.document_id_promo") == F.col("page_views_users.document_id_pv")], how='left') # print("validation_set_df.count() =", validation_set_df.count()) # Added to validation set information about the event and the user for statistics of the error (avg ctr) validation_set_ground_truth_df = validation_set_df.filter('clicked = 1') \ .join(user_profiles_df, on=[F.col("user_profiles.uuid") == F.col("uuid_event")], how='left') \ .withColumn('user_categories_count', list_len_udf('category_id_list')) \ .withColumn('user_topics_count', list_len_udf('topic_id_list')) \ .withColumn('user_entities_count', list_len_udf('entity_id_list')) \ .select('display_id', 'ad_id', 'platform_event', 'day_event', 'timestamp_event', 'geo_location_event', 'event_country', 'event_country_state', 'views', 'user_categories_count', 'user_topics_count', 'user_entities_count') \ .withColumnRenamed('ad_id', 'ad_id_gt') \ .withColumnRenamed('views', 'user_views_count') \ .cache() # print("validation_set_ground_truth_df.count() =", validation_set_ground_truth_df.count()) train_set_df = spark.sql(''' SELECT * FROM clicks_train_joined t WHERE NOT EXISTS (SELECT display_id FROM validation_display_ids WHERE display_id = t.display_id)''').cache() print("train_set_df.count() =", train_set_df.count()) # validation_display_ids_df.groupBy("day_event").count().show() else: clicks_test_schema = StructType( [StructField("display_id", IntegerType(), True), StructField("ad_id", IntegerType(), True)] ) clicks_test_df = spark.read.schema(clicks_test_schema) \ .options(header='true', inferschema='false', nullValue='\\N') \ .csv(DATA_BUCKET_FOLDER + "clicks_test.csv") \ .withColumn('dummyClicksTest', F.lit(1)) \ .withColumn('clicked', F.lit(-999)) \ .alias('clicks_test') test_set_df = clicks_test_df \ .join(promoted_content_df, on='ad_id', how='left') \ .join(documents_meta_df, on=F.col("promoted_content.document_id_promo") == F.col("documents_meta.document_id_doc"), how='left') \ .join(documents_categories_grouped_df, on=F.col("document_id_promo") == F.col("documents_categories_grouped.document_id_cat"), how='left') \ .join(documents_topics_grouped_df, on=F.col("document_id_promo") == F.col("documents_topics_grouped.document_id_top"), how='left') \ .join(documents_entities_grouped_df, on=F.col("document_id_promo") == F.col("documents_entities_grouped.document_id_ent"), how='left') \ .join(events_joined_df, on='display_id', how='left') \ .join(documents_categories_grouped_df .withColumnRenamed('category_id_list', 'doc_event_category_id_list') .withColumnRenamed('confidence_level_cat_list', 'doc_event_confidence_level_cat_list') .alias('documents_event_categories_grouped'), on=F.col("document_id_event") == F.col("documents_event_categories_grouped.document_id_cat"), how='left') \ .join(documents_topics_grouped_df .withColumnRenamed('topic_id_list', 'doc_event_topic_id_list') .withColumnRenamed('confidence_level_top_list', 'doc_event_confidence_level_top_list') .alias('documents_event_topics_grouped'), on=F.col("document_id_event") == F.col("documents_event_topics_grouped.document_id_top"), how='left') \ .join(documents_entities_grouped_df .withColumnRenamed('entity_id_list', 'doc_event_entity_id_list') .withColumnRenamed('confidence_level_ent_list', 'doc_event_confidence_level_ent_list') .alias('documents_event_entities_grouped'), on=F.col("document_id_event") == F.col("documents_event_entities_grouped.document_id_ent"), how='left') \ .join(page_views_users_df, on=[F.col("events.uuid_event") == F.col("page_views_users.uuid_pv"), F.col("promoted_content.document_id_promo") == F.col("page_views_users.document_id_pv")], how='left') train_set_df = clicks_train_joined_df.cache() print("train_set_df.count() =", train_set_df.count()) # # Training models def is_null(value): return value is None or len(str(value).strip()) == 0 LESS_SPECIAL_CAT_VALUE = 'less' def get_category_field_values_counts(field, df, min_threshold=10): category_counts = dict(list(filter(lambda x: not is_null(x[0]) and x[1] >= min_threshold, df.select(field).groupBy(field).count().rdd.map( lambda x: (x[0], x[1])).collect()))) # Adding a special value to create a feature for values in this category that are less than min_threshold category_counts[LESS_SPECIAL_CAT_VALUE] = -1 return category_counts # ## Building category values counters and indexers event_country_values_counts = get_category_field_values_counts('event_country', events_df, min_threshold=10) len(event_country_values_counts) # All non-null categories: 230 event_country_state_values_counts = get_category_field_values_counts('event_country_state', events_df, min_threshold=10) len(event_country_state_values_counts) event_geo_location_values_counts = get_category_field_values_counts('geo_location_event', events_df, min_threshold=10) len(event_geo_location_values_counts) # All non-null categories: 2988 doc_entity_id_values_counts = get_category_field_values_counts('entity_id', documents_entities_df, min_threshold=10) len(doc_entity_id_values_counts) # All non-null categories: 1326009 # ## Processing average CTR by categories def get_percentiles(df, field, quantiles_levels=None, max_error_rate=0.0): if quantiles_levels is None: quantiles_levels = np.arange(0.0, 1.1, 0.1).tolist() quantiles = df.approxQuantile(field, quantiles_levels, max_error_rate) return dict(zip(quantiles_levels, quantiles)) # REG = 10 REG = 0 ctr_udf = F.udf(lambda clicks, views: clicks / float(views + REG), FloatType()) # ### Average CTR by ad_id ad_id_popularity_df = train_set_df.groupby('ad_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) # ad_id_popularity_df.count() # get_percentiles(ad_id_popularity_df, 'clicks') # get_percentiles(ad_id_popularity_df, 'views') ad_id_popularity = ad_id_popularity_df.filter('views > 5').select('ad_id', 'ctr', 'views') \ .rdd.map(lambda x: (x['ad_id'], (x['ctr'], x['views'], 1, 1))).collectAsMap() ad_id_popularity_broad = sc.broadcast(ad_id_popularity) list(ad_id_popularity.values())[:3] len(ad_id_popularity) # get_ad_id_ctr_udf = F.udf(lambda ad_id: ad_id_popularity[ad_id] if ad_id in ad_id_popularity else -1, FloatType()) ad_id_avg_ctr = sum(map(lambda x: x[0], ad_id_popularity.values())) / float(len(ad_id_popularity)) ad_id_avg_ctr ad_id_weighted_avg_ctr = sum(map(lambda x: x[0] * x[1], ad_id_popularity.values())) / float( sum(map(lambda x: x[1], ad_id_popularity.values()))) ad_id_weighted_avg_ctr ad_id_views_median = np.median(np.array(list(map(lambda x: x[1], ad_id_popularity.values())))) ad_id_views_median ad_id_views_mean = sum(map(lambda x: x[1], ad_id_popularity.values())) / float(len(ad_id_popularity)) ad_id_views_mean # ### Average CTR by document_id (promoted_content) document_id_popularity_df = train_set_df \ .groupby('document_id_promo') \ .agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) document_id_popularity = document_id_popularity_df.filter('views > 5') \ .select('document_id_promo', 'ctr', 'views', 'distinct_ad_ids') \ .rdd.map(lambda x: (x['document_id_promo'], (x['ctr'], x['views'], x['distinct_ad_ids'], 1))).collectAsMap() len(document_id_popularity) document_id_popularity_broad = sc.broadcast(document_id_popularity) # document_id_popularity_df.count() # get_percentiles(document_id_popularity_df, 'clicks') # get_percentiles(document_id_popularity_df, 'views') document_id_avg_ctr = sum(map(lambda x: x[0], document_id_popularity.values())) / float(len(document_id_popularity)) document_id_avg_ctr document_id_weighted_avg_ctr = sum(list(map(lambda x: x[0] * x[1], document_id_popularity.values()))) / float( sum(list(map(lambda x: x[1], document_id_popularity.values())))) document_id_weighted_avg_ctr document_id_views_median = np.median(np.array(list(map(lambda x: x[1], document_id_popularity.values())))) document_id_views_median document_id_views_mean = sum(map(lambda x: x[1], document_id_popularity.values())) / float(len(document_id_popularity)) document_id_views_mean # ### Average CTR by (doc_event, doc_ad) doc_event_doc_ad_avg_ctr_df = train_set_df.groupBy('document_id_event', 'document_id_promo') \ .agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) doc_event_doc_ad_avg_ctr = doc_event_doc_ad_avg_ctr_df.filter('views > 5') \ .select('document_id_event', 'document_id_promo', 'ctr', 'views', 'distinct_ad_ids') \ .rdd.map(lambda x: ((x['document_id_event'], x['document_id_promo']), (x['ctr'], x['views'], x['distinct_ad_ids'], 1))).collectAsMap() len(doc_event_doc_ad_avg_ctr) doc_event_doc_ad_avg_ctr_broad = sc.broadcast(doc_event_doc_ad_avg_ctr) # ### Average CTR by country, source_id source_id_by_country_popularity_df = train_set_df \ .select('clicked', 'source_id', 'event_country', 'ad_id') \ .groupby('event_country', 'source_id') \ .agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) # source_id_popularity = source_id_popularity_df # .filter('views > 100 and source_id is not null') # .select('source_id', 'ctr') # .rdd.collectAsMap() source_id_by_country_popularity = source_id_by_country_popularity_df.filter( 'views > 5 and source_id is not null and event_country <> ""').select('event_country', 'source_id', 'ctr', 'views', 'distinct_ad_ids').rdd.map( lambda x: ((x['event_country'], x['source_id']), (x['ctr'], x['views'], x['distinct_ad_ids'], 1))).collectAsMap() len(source_id_by_country_popularity) source_id_by_country_popularity_broad = sc.broadcast(source_id_by_country_popularity) source_id_by_country_avg_ctr = sum(map(lambda x: x[0], source_id_by_country_popularity.values())) / float( len(source_id_by_country_popularity)) source_id_by_country_avg_ctr source_id_by_country_weighted_avg_ctr = sum( map(lambda x: x[0] * x[1], source_id_by_country_popularity.values())) / float( sum(map(lambda x: x[1], source_id_by_country_popularity.values()))) source_id_by_country_weighted_avg_ctr source_id_by_country_views_median = np.median( np.array(list(map(lambda x: x[1], source_id_by_country_popularity.values())))) source_id_by_country_views_median source_id_by_country_views_mean = sum(map(lambda x: x[1], source_id_by_country_popularity.values())) / float( len(source_id_by_country_popularity)) source_id_by_country_views_mean # ### Average CTR by source_id source_id_popularity_df = train_set_df.select('clicked', 'source_id', 'ad_id') \ .groupby('source_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) source_id_popularity = source_id_popularity_df \ .filter('views > 10 and source_id is not null') \ .select('source_id', 'ctr', 'views', 'distinct_ad_ids') \ .rdd.map(lambda x: (x['source_id'], (x['ctr'], x['views'], x['distinct_ad_ids'], 1))) \ .collectAsMap() len(source_id_popularity) source_id_popularity_broad = sc.broadcast(source_id_popularity) # source_id_popularity_df.count() # get_percentiles(source_id_popularity_df, 'clicks') # get_percentiles(source_id_popularity_df, 'views') # source_id_popularity = source_id_popularity_df # .filter('views > 100 and source_id is not null') # .select('source_id', 'ctr') # .rdd.collectAsMap() # ### Average CTR by publisher_id publisher_popularity_df = train_set_df.select('clicked', 'publisher_id', 'ad_id') \ .groupby('publisher_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) publisher_popularity = publisher_popularity_df \ .filter('views > 10 and publisher_id is not null') \ .select('publisher_id', 'ctr', 'views', 'distinct_ad_ids') \ .rdd.map(lambda x: (x['publisher_id'], (x['ctr'], x['views'], x['distinct_ad_ids'], 1))) \ .collectAsMap() len(publisher_popularity) publisher_popularity_broad = sc.broadcast(publisher_popularity) # publisher_popularity_df.count() # ##863 # get_percentiles(publisher_popularity_df, 'clicks') # get_percentiles(publisher_popularity_df, 'views') # publisher_id_popularity = publisher_popularity_df # .filter('views > 100 and publisher_id is not null') # .select('publisher_id', 'ctr') # .rdd.collectAsMap() # len(publisher_id_popularity) # ##639 # ### Average CTR by advertiser_id advertiser_id_popularity_df = train_set_df.select('clicked', 'advertiser_id', 'ad_id') \ .groupby('advertiser_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) advertiser_id_popularity = advertiser_id_popularity_df \ .filter('views > 10 and advertiser_id is not null') \ .select('advertiser_id', 'ctr', 'views', 'distinct_ad_ids') \ .rdd.map(lambda x: (x['advertiser_id'], (x['ctr'], x['views'], x['distinct_ad_ids'], 1))).collectAsMap() len(advertiser_id_popularity) advertiser_id_popularity_broad = sc.broadcast(advertiser_id_popularity) # advertiser_id_popularity_df.count() # ##4063 # get_percentiles(advertiser_id_popularity_df, 'clicks') # get_percentiles(advertiser_id_popularity_df, 'views') # advertiser_id_popularity = advertiser_id_popularity_df # .filter('views > 100 and advertiser_id is not null') # .select('advertiser_id', 'ctr') # .rdd.collectAsMap() # len(advertiser_id_popularity) # ##3129 # ### Average CTR by campaign_id campaign_id_popularity_df = train_set_df.select('clicked', 'campaign_id', 'ad_id') \ .groupby('campaign_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) campaign_id_popularity = campaign_id_popularity_df \ .filter('views > 10 and campaign_id is not null') \ .select('campaign_id', 'ctr', 'views', 'distinct_ad_ids') \ .rdd.map(lambda x: (x['campaign_id'], (x['ctr'], x['views'], x['distinct_ad_ids'], 1))) \ .collectAsMap() len(campaign_id_popularity) campaign_id_popularity_broad = sc.broadcast(campaign_id_popularity) # campaign_id_popularity_df.count() # ##31390 # get_percentiles(campaign_id_popularity_df, 'clicks') # get_percentiles(campaign_id_popularity_df, 'views') # campaign_id_popularity = campaign_id_popularity_df # .filter('views > 100 and campaign_id is not null') # .select('campaign_id', 'ctr') # .rdd.collectAsMap() # len(campaign_id_popularity) # ##16097 # ### Average CTR by category category_id_popularity_df = train_set_df.join( documents_categories_df.alias('cat_local'), on=F.col("document_id_promo") == F.col("cat_local.document_id_cat"), how='inner') \ .select('clicked', 'category_id', 'confidence_level_cat', 'ad_id') \ .groupby('category_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.mean('confidence_level_cat').alias('avg_confidence_level_cat'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) category_id_popularity = category_id_popularity_df.filter('views > 10') \ .select('category_id', 'ctr', 'views', 'avg_confidence_level_cat', 'distinct_ad_ids') \ .rdd.map(lambda x: (x['category_id'], (x['ctr'], x['views'], x['distinct_ad_ids'], x['avg_confidence_level_cat']))).collectAsMap() len(category_id_popularity) category_id_popularity_broad = sc.broadcast(category_id_popularity) list(category_id_popularity.values())[:10] np.median(np.array(list(map(lambda x: x[1], category_id_popularity.values())))) sum(map(lambda x: x[1], category_id_popularity.values())) / float(len(category_id_popularity)) # Parece haver uma hierarquia nas categorias pelo padrão dos códigos... # category_id_popularity # ### Average CTR by (country, category) category_id_by_country_popularity_df = train_set_df \ .join(documents_categories_df.alias('cat_local'), on=F.col("document_id_promo") == F.col("cat_local.document_id_cat"), how='inner') \ .select('clicked', 'category_id', 'confidence_level_cat', 'event_country', 'ad_id') \ .groupby('event_country', 'category_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.mean('confidence_level_cat').alias('avg_confidence_level_cat'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) category_id_by_country_popularity = category_id_by_country_popularity_df \ .filter('views > 10 and event_country <> ""') \ .select('event_country', 'category_id', 'ctr', 'views', 'avg_confidence_level_cat', 'distinct_ad_ids') \ .rdd.map(lambda x: ((x['event_country'], x['category_id']), (x['ctr'], x['views'], x['distinct_ad_ids'], x['avg_confidence_level_cat']))).collectAsMap() len(category_id_by_country_popularity) category_id_by_country_popularity_broad = sc.broadcast(category_id_by_country_popularity) # ### Average CTR by Topic topic_id_popularity_df = train_set_df.join( documents_topics_df.alias('top_local'), on=F.col("document_id_promo") == F.col("top_local.document_id_top"), how='inner') \ .select('clicked', 'topic_id', 'confidence_level_top', 'ad_id') \ .groupby('topic_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.mean('confidence_level_top').alias('avg_confidence_level_top'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) topic_id_popularity = topic_id_popularity_df.filter('views > 10') \ .select('topic_id', 'ctr', 'views', 'avg_confidence_level_top', 'distinct_ad_ids') \ .rdd.map(lambda x: (x['topic_id'], (x['ctr'], x['views'], x['distinct_ad_ids'], x['avg_confidence_level_top']))).collectAsMap() len(topic_id_popularity) topic_id_popularity_broad = sc.broadcast(topic_id_popularity) sum(map(lambda x: x[1], topic_id_popularity.values())) / float(len(topic_id_popularity)) sum(map(lambda x: x[2] * x[1], topic_id_popularity.values())) / float(len(topic_id_popularity)) # ### Average CTR by (country, topic) topic_id_by_country_popularity_df = train_set_df.join( documents_topics_df.alias('top_local'), on=F.col("document_id_promo") == F.col("top_local.document_id_top"), how='inner') \ .select('clicked', 'topic_id', 'confidence_level_top', 'event_country', 'ad_id') \ .groupby('event_country', 'topic_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.mean('confidence_level_top').alias('avg_confidence_level_top'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) topic_id_id_by_country_popularity = topic_id_by_country_popularity_df \ .filter('views > 10 and event_country <> ""') \ .select('event_country', 'topic_id', 'ctr', 'views', 'avg_confidence_level_top', 'distinct_ad_ids') \ .rdd.map(lambda x: ((x['event_country'], x['topic_id']), (x['ctr'], x['views'], x['distinct_ad_ids'], x['avg_confidence_level_top']))).collectAsMap() len(topic_id_id_by_country_popularity) topic_id_id_by_country_popularity_broad = sc.broadcast(topic_id_id_by_country_popularity) # ### Average CTR by Entity entity_id_popularity_df = train_set_df.join( documents_entities_df.alias('ent_local'), on=F.col("document_id_promo") == F.col("ent_local.document_id_ent"), how='inner') \ .select('clicked', 'entity_id', 'confidence_level_ent', 'ad_id') \ .groupby('entity_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.mean('confidence_level_ent').alias('avg_confidence_level_ent'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) entity_id_popularity = entity_id_popularity_df.filter('views > 5') \ .select('entity_id', 'ctr', 'views', 'avg_confidence_level_ent', 'distinct_ad_ids') \ .rdd.map(lambda x: (x['entity_id'], (x['ctr'], x['views'], x['distinct_ad_ids'], x['avg_confidence_level_ent']))).collectAsMap() len(entity_id_popularity) entity_id_popularity_broad = sc.broadcast(entity_id_popularity) np.median(np.array(list(map(lambda x: x[1], entity_id_popularity.values())))) sum(map(lambda x: x[1], entity_id_popularity.values())) / float(len(entity_id_popularity)) # ### Average CTR by (country, entity) entity_id_by_country_popularity_df = train_set_df.join( documents_entities_df.alias('ent_local'), on=F.col("document_id_promo") == F.col("ent_local.document_id_ent"), how='inner') \ .select('clicked', 'entity_id', 'event_country', 'confidence_level_ent', 'ad_id') \ .groupby('event_country', 'entity_id').agg(F.sum('clicked').alias('clicks'), F.count('*').alias('views'), F.mean('confidence_level_ent').alias('avg_confidence_level_ent'), F.countDistinct('ad_id').alias('distinct_ad_ids')) \ .withColumn('ctr', ctr_udf('clicks', 'views')) entity_id_by_country_popularity = entity_id_by_country_popularity_df \ .filter('views > 5 and event_country <> ""') \ .select('event_country', 'entity_id', 'ctr', 'views', 'avg_confidence_level_ent', 'distinct_ad_ids') \ .rdd.map(lambda x: ((x['event_country'], x['entity_id']), (x['ctr'], x['views'], x['distinct_ad_ids'], x['avg_confidence_level_ent']))).collectAsMap() len(entity_id_by_country_popularity) entity_id_by_country_popularity_broad = sc.broadcast(entity_id_by_country_popularity) # ### Loading # docs by categories, topics, entities df_filenames_suffix = '' if evaluation: df_filenames_suffix = '_eval' with open(OUTPUT_BUCKET_FOLDER + 'categories_docs_counts' + df_filenames_suffix + '.pickle', 'rb') as input_file: categories_docs_counts = cPickle.load(input_file) len(categories_docs_counts) with open(OUTPUT_BUCKET_FOLDER + 'topics_docs_counts' + df_filenames_suffix + '.pickle', 'rb') as input_file: topics_docs_counts = cPickle.load(input_file) len(topics_docs_counts) with open(OUTPUT_BUCKET_FOLDER + 'entities_docs_counts' + df_filenames_suffix + '.pickle', 'rb') as input_file: entities_docs_counts = cPickle.load(input_file) len(entities_docs_counts) documents_total = documents_meta_df.count() documents_total # ## Exploring Publish Time publish_times_df = train_set_df.filter('publish_time is not null').select('document_id_promo', 'publish_time').distinct().select( F.col('publish_time').cast(IntegerType())) publish_time_percentiles = get_percentiles(publish_times_df, 'publish_time', quantiles_levels=[0.5], max_error_rate=0.001) publish_time_percentiles publish_time_median = int(publish_time_percentiles[0.5]) datetime.datetime.utcfromtimestamp(publish_time_median) def get_days_diff(newer_timestamp, older_timestamp): sec_diff = newer_timestamp - older_timestamp days_diff = sec_diff / 60 / 60 / 24 return days_diff def get_time_decay_factor(timestamp, timestamp_ref=None, alpha=0.001): if timestamp_ref is None: timestamp_ref = time.time() days_diff = get_days_diff(timestamp_ref, timestamp) denominator = math.pow(1 + alpha, days_diff) if denominator != 0: return 1.0 / denominator else: return 0.0 TIME_DECAY_ALPHA = 0.0005 ref_dates = [ 1476714880, # 7 days 1474727680, # 30 days 1469370880, # 90 days 1461508480, # 180 days 1445697280, # 1 year 1414161280 # 2 years ] for d in ref_dates: print(datetime.datetime.utcfromtimestamp(d), get_time_decay_factor(d, alpha=TIME_DECAY_ALPHA)) # ### Get local time DEFAULT_TZ_EST = -4.0 def get_local_utc_bst_tz(event_country, event_country_state): local_tz = DEFAULT_TZ_EST if len(event_country) > 0: if event_country in countries_utc_dst_broad.value: local_tz = countries_utc_dst_broad.value[event_country] if len(event_country_state) > 2: state = event_country_state[3:5] if event_country == 'US': if state in us_states_utc_dst_broad.value: local_tz = us_states_utc_dst_broad.value[state] elif event_country == 'CA': if state in ca_countries_utc_dst_broad.value: local_tz = ca_countries_utc_dst_broad.value[state] return float(local_tz) hour_bins_dict = {'EARLY_MORNING': 0, 'MORNING': 1, 'MIDDAY': 2, 'AFTERNOON': 3, 'EVENING': 4, 'NIGHT': 5} hour_bins_values = sorted(hour_bins_dict.values()) def get_hour_bin(hour): if hour >= 5 and hour < 8: hour_bin = hour_bins_dict['EARLY_MORNING'] elif hour >= 8 and hour < 11: hour_bin = hour_bins_dict['MORNING'] elif hour >= 11 and hour < 14: hour_bin = hour_bins_dict['MIDDAY'] elif hour >= 14 and hour < 19: hour_bin = hour_bins_dict['AFTERNOON'] elif hour >= 19 and hour < 22: hour_bin = hour_bins_dict['EVENING'] else: hour_bin = hour_bins_dict['NIGHT'] return hour_bin def get_local_datetime(dt, event_country, event_country_state): local_tz = get_local_utc_bst_tz(event_country, event_country_state) tz_delta = local_tz - DEFAULT_TZ_EST local_time = dt + datetime.timedelta(hours=tz_delta) return local_time get_local_datetime(datetime.datetime.now(), 'US', 'US>CA') def is_weekend(dt): return dt.weekday() >= 5 is_weekend(datetime.datetime(2016, 6, 14)) # ## Average CTR functions timestamp_ref = date_time_to_unix_epoch(datetime.datetime(2016, 6, 29, 3, 59, 59)) decay_factor_default = get_time_decay_factor(publish_time_median, timestamp_ref, alpha=TIME_DECAY_ALPHA) print("decay_factor_default", decay_factor_default) def get_confidence_sample_size(sample, max_for_reference=100000): # Avoiding overflow for large sample size if sample >= max_for_reference: return 1.0 ref_log = math.log(1 + max_for_reference, 2) # Curiosly reference in log with base 2 gives a slightly higher score, so I will keep return math.log(1 + sample) / float(ref_log) for i in [0, 0.5, 1, 2, 3, 4, 5, 10, 20, 30, 100, 200, 300, 1000, 2000, 3000, 10000, 20000, 30000, 50000, 90000, 100000, 500000, 900000, 1000000, 2171607]: print(i, get_confidence_sample_size(i)) def get_popularity(an_id, a_dict): return (a_dict[an_id][0], get_confidence_sample_size(a_dict[an_id][1] / float(a_dict[an_id][2])) * a_dict[an_id][ 3]) if an_id in a_dict else (None, None) def get_weighted_avg_popularity_from_list(ids_list, confidence_ids_list, pop_dict): pops = list(filter(lambda x: x[0][0] is not None, [(get_popularity(an_id, pop_dict), confidence) for an_id, confidence in zip(ids_list, confidence_ids_list)])) # print("pops",pops) if len(pops) > 0: weighted_avg = sum(map(lambda x: x[0][0] * x[0][1] * x[1], pops)) / float( sum(map(lambda x: x[0][1] * x[1], pops))) confidence = max(map(lambda x: x[0][1] * x[1], pops)) return weighted_avg, confidence else: return None, None def get_weighted_avg_country_popularity_from_list(event_country, ids_list, confidence_ids_list, pop_dict): pops = list(filter(lambda x: x[0][0] is not None, [(get_popularity((event_country, an_id), pop_dict), confidence) for an_id, confidence in zip(ids_list, confidence_ids_list)])) if len(pops) > 0: weighted_avg = sum(map(lambda x: x[0][0] * x[0][1] * x[1], pops)) / float( sum(map(lambda x: x[0][1] * x[1], pops))) confidence = max(map(lambda x: x[0][1] * x[1], pops)) return weighted_avg, confidence else: return None, None def get_popularity_score(event_country, ad_id, document_id, source_id, publisher_id, advertiser_id, campaign_id, document_id_event, category_ids_by_doc, cat_confidence_level_by_doc, topic_ids_by_doc, top_confidence_level_by_doc, entity_ids_by_doc, ent_confidence_level_by_doc, output_detailed_list=False): probs = [] avg_ctr, confidence = get_popularity(ad_id, ad_id_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_ad_id', avg_ctr, confidence)) avg_ctr, confidence = get_popularity(document_id, document_id_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_document_id', avg_ctr, confidence)) avg_ctr, confidence = get_popularity((document_id_event, document_id), doc_event_doc_ad_avg_ctr_broad.value) if avg_ctr is not None: probs.append(('pop_doc_event_doc_ad', avg_ctr, confidence)) if source_id != -1: avg_ctr = None if event_country != '': avg_ctr, confidence = get_popularity((event_country, source_id), source_id_by_country_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_source_id_country', avg_ctr, confidence)) avg_ctr, confidence = get_popularity(source_id, source_id_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_source_id', avg_ctr, confidence)) if publisher_id is not None: avg_ctr, confidence = get_popularity(publisher_id, publisher_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_publisher_id', avg_ctr, confidence)) if advertiser_id is not None: avg_ctr, confidence = get_popularity(advertiser_id, advertiser_id_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_advertiser_id', avg_ctr, confidence)) if campaign_id is not None: avg_ctr, confidence = get_popularity(campaign_id, campaign_id_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_campain_id', avg_ctr, confidence)) if len(entity_ids_by_doc) > 0: avg_ctr = None if event_country != '': avg_ctr, confidence = get_weighted_avg_country_popularity_from_list( event_country, entity_ids_by_doc, ent_confidence_level_by_doc, entity_id_by_country_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_entity_id_country', avg_ctr, confidence)) avg_ctr, confidence = get_weighted_avg_popularity_from_list( entity_ids_by_doc, ent_confidence_level_by_doc, entity_id_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_entity_id', avg_ctr, confidence)) if len(topic_ids_by_doc) > 0: avg_ctr = None if event_country != '': avg_ctr, confidence = get_weighted_avg_country_popularity_from_list( event_country, topic_ids_by_doc, top_confidence_level_by_doc, topic_id_id_by_country_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_topic_id_country', avg_ctr, confidence)) avg_ctr, confidence = get_weighted_avg_popularity_from_list( topic_ids_by_doc, top_confidence_level_by_doc, topic_id_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_topic_id', avg_ctr, confidence)) if len(category_ids_by_doc) > 0: avg_ctr = None if event_country != '': avg_ctr, confidence = get_weighted_avg_country_popularity_from_list( event_country, category_ids_by_doc, cat_confidence_level_by_doc, category_id_by_country_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_category_id_country', avg_ctr, confidence)) avg_ctr, confidence = get_weighted_avg_popularity_from_list( category_ids_by_doc, cat_confidence_level_by_doc, category_id_popularity_broad.value) if avg_ctr is not None: probs.append(('pop_category_id', avg_ctr, confidence)) # print("[get_popularity_score] probs", probs) if output_detailed_list: return probs else: if len(probs) > 0: # weighted_avg_probs_by_confidence = sum(map(lambda x: x[1] * math.log(1+x[2],2), probs)) \ # / float(sum(map(lambda x: math.log(1+x[2],2), probs))) weighted_avg_probs_by_confidence = sum(map(lambda x: x[1] * x[2], probs)) / float( sum(map(lambda x: x[2], probs))) confidence = max(map(lambda x: x[2], probs)) return weighted_avg_probs_by_confidence, confidence else: return None, None # ## Content-Based similarity functions def cosine_similarity_dicts(dict1, dict2): dict1_norm = math.sqrt(sum([v ** 2 for v in dict1.values()])) dict2_norm = math.sqrt(sum([v ** 2 for v in dict2.values()])) sum_common_aspects = 0.0 intersections = 0 for key in dict1: if key in dict2: sum_common_aspects += dict1[key] * dict2[key] intersections += 1 return sum_common_aspects / (dict1_norm * dict2_norm), intersections def cosine_similarity_user_docs_aspects(user_aspect_profile, doc_aspect_ids, doc_aspects_confidence, aspect_docs_counts): if user_aspect_profile is None or len(user_aspect_profile) == 0 or doc_aspect_ids is None or len( doc_aspect_ids) == 0: return None, None doc_aspects = dict(zip(doc_aspect_ids, doc_aspects_confidence)) doc_aspects_tfidf_confid = {} for key in doc_aspects: tf = 1.0 idf = math.log(math.log(documents_total / float(aspect_docs_counts[key]))) confidence = doc_aspects[key] doc_aspects_tfidf_confid[key] = tf * idf * confidence user_aspects_tfidf_confid = {} for key in user_aspect_profile: tfidf = user_aspect_profile[key][0] confidence = user_aspect_profile[key][1] user_aspects_tfidf_confid[key] = tfidf * confidence similarity, intersections = cosine_similarity_dicts(doc_aspects_tfidf_confid, user_aspects_tfidf_confid) if intersections > 0: # P(A intersect B)_intersections = P(A)^intersections * P(B)^intersections random_error = math.pow(len(doc_aspects) / float(len(aspect_docs_counts)), intersections) * math.pow(len(user_aspect_profile) / float(len(aspect_docs_counts)), intersections) else: # P(A not intersect B) = 1 - P(A intersect B) random_error = 1 - ((len(doc_aspects) / float(len(aspect_docs_counts))) * (len(user_aspect_profile) / float(len(aspect_docs_counts)))) confidence = 1.0 - random_error return similarity, confidence def cosine_similarity_doc_event_doc_ad_aspects(doc_event_aspect_ids, doc_event_aspects_confidence, doc_ad_aspect_ids, doc_ad_aspects_confidence, aspect_docs_counts): if doc_event_aspect_ids is None or len(doc_event_aspect_ids) == 0 \ or doc_ad_aspect_ids is None or len(doc_ad_aspect_ids) == 0: return None, None doc_event_aspects = dict(zip(doc_event_aspect_ids, doc_event_aspects_confidence)) doc_event_aspects_tfidf_confid = {} for key in doc_event_aspect_ids: tf = 1.0 idf = math.log(math.log(documents_total / float(aspect_docs_counts[key]))) confidence = doc_event_aspects[key] doc_event_aspects_tfidf_confid[key] = tf * idf * confidence doc_ad_aspects = dict(zip(doc_ad_aspect_ids, doc_ad_aspects_confidence)) doc_ad_aspects_tfidf_confid = {} for key in doc_ad_aspect_ids: tf = 1.0 idf = math.log(math.log(documents_total / float(aspect_docs_counts[key]))) confidence = doc_ad_aspects[key] doc_ad_aspects_tfidf_confid[key] = tf * idf * confidence similarity, intersections = cosine_similarity_dicts(doc_event_aspects_tfidf_confid, doc_ad_aspects_tfidf_confid) if intersections > 0: # P(A intersect B)_intersections = P(A)^intersections * P(B)^intersections random_error = math.pow(len(doc_event_aspect_ids) / float(len(aspect_docs_counts)), intersections) * math.pow(len(doc_ad_aspect_ids) / float(len(aspect_docs_counts)), intersections) else: # P(A not intersect B) = 1 - P(A intersect B) random_error = 1 - ((len(doc_event_aspect_ids) / float(len(aspect_docs_counts))) * (len(doc_ad_aspect_ids) / float(len(aspect_docs_counts)))) confidence = 1.0 - random_error return similarity, confidence def get_user_cb_interest_score(user_views_count, user_categories, user_topics, user_entities, timestamp_event, category_ids_by_doc, cat_confidence_level_by_doc, topic_ids_by_doc, top_confidence_level_by_doc, entity_ids_by_doc, ent_confidence_level_by_doc, output_detailed_list=False): # Content-Based sims = [] categories_similarity, cat_sim_confidence = cosine_similarity_user_docs_aspects(user_categories, category_ids_by_doc, cat_confidence_level_by_doc, categories_docs_counts) if categories_similarity is not None: sims.append(('user_doc_ad_sim_categories', categories_similarity, cat_sim_confidence)) topics_similarity, top_sim_confidence = cosine_similarity_user_docs_aspects(user_topics, topic_ids_by_doc, top_confidence_level_by_doc, topics_docs_counts) if topics_similarity is not None: sims.append(('user_doc_ad_sim_topics', topics_similarity, top_sim_confidence)) entities_similarity, entity_sim_confid = cosine_similarity_user_docs_aspects(user_entities, entity_ids_by_doc, ent_confidence_level_by_doc, entities_docs_counts) if entities_similarity is not None: sims.append(('user_doc_ad_sim_entities', entities_similarity, entity_sim_confid)) if output_detailed_list: return sims else: if len(sims) > 0: weighted_avg_sim_by_confidence = sum(map(lambda x: x[1] * x[2], sims)) / float( sum(map(lambda x: x[2], sims))) confidence = sum(map(lambda x: x[2], sims)) / float(len(sims)) # print("[get_user_cb_interest_score] sims: {} | \ # Avg: {} - Confid: {}".format(sims, weighted_avg_sim_by_confidence, confidence)) return weighted_avg_sim_by_confidence, confidence else: return None, None def get_doc_event_doc_ad_cb_similarity_score(doc_event_category_ids, doc_event_cat_confidence_levels, doc_event_topic_ids, doc_event_top_confidence_levels, doc_event_entity_ids, doc_event_ent_confidence_levels, doc_ad_category_ids, doc_ad_cat_confidence_levels, doc_ad_topic_ids, doc_ad_top_confidence_levels, doc_ad_entity_ids, doc_ad_ent_confidence_levels, output_detailed_list=False): # Content-Based sims = [] categories_similarity, cat_sim_confidence = cosine_similarity_doc_event_doc_ad_aspects( doc_event_category_ids, doc_event_cat_confidence_levels, doc_ad_category_ids, doc_ad_cat_confidence_levels, categories_docs_counts) if categories_similarity is not None: sims.append(('doc_event_doc_ad_sim_categories', categories_similarity, cat_sim_confidence)) topics_similarity, top_sim_confidence = cosine_similarity_doc_event_doc_ad_aspects( doc_event_topic_ids, doc_event_top_confidence_levels, doc_ad_topic_ids, doc_ad_top_confidence_levels, topics_docs_counts) if topics_similarity is not None: sims.append(('doc_event_doc_ad_sim_topics', topics_similarity, top_sim_confidence)) entities_similarity, entity_sim_confid = cosine_similarity_doc_event_doc_ad_aspects( doc_event_entity_ids, doc_event_ent_confidence_levels, doc_ad_entity_ids, doc_ad_ent_confidence_levels, entities_docs_counts) if entities_similarity is not None: sims.append(('doc_event_doc_ad_sim_entities', entities_similarity, entity_sim_confid)) if output_detailed_list: return sims else: if len(sims) > 0: weighted_avg_sim_by_confidence = sum(map(lambda x: x[1] * x[2], sims)) / float( sum(map(lambda x: x[2], sims))) confidence = sum(map(lambda x: x[2], sims)) / float(len(sims)) # print("[get_user_cb_interest_score] sims: {} | \ # Avg: {} - Confid: {}".format(sims, weighted_avg_sim_by_confidence, confidence)) return weighted_avg_sim_by_confidence, confidence else: return None, None # # Feature Vector export bool_feature_names = ['event_weekend', 'user_has_already_viewed_doc'] int_feature_names = ['user_views', 'ad_views', 'doc_views', 'doc_event_days_since_published', 'doc_event_hour', 'doc_ad_days_since_published', ] float_feature_names = [ 'pop_ad_id', 'pop_ad_id_conf', 'pop_ad_id_conf_multipl', 'pop_document_id', 'pop_document_id_conf', 'pop_document_id_conf_multipl', 'pop_publisher_id', 'pop_publisher_id_conf', 'pop_publisher_id_conf_multipl', 'pop_advertiser_id', 'pop_advertiser_id_conf', 'pop_advertiser_id_conf_multipl', 'pop_campain_id', 'pop_campain_id_conf', 'pop_campain_id_conf_multipl', 'pop_doc_event_doc_ad', 'pop_doc_event_doc_ad_conf', 'pop_doc_event_doc_ad_conf_multipl', 'pop_source_id', 'pop_source_id_conf', 'pop_source_id_conf_multipl', 'pop_source_id_country', 'pop_source_id_country_conf', 'pop_source_id_country_conf_multipl', 'pop_entity_id', 'pop_entity_id_conf', 'pop_entity_id_conf_multipl', 'pop_entity_id_country', 'pop_entity_id_country_conf', 'pop_entity_id_country_conf_multipl', 'pop_topic_id', 'pop_topic_id_conf', 'pop_topic_id_conf_multipl', 'pop_topic_id_country', 'pop_topic_id_country_conf', 'pop_topic_id_country_conf_multipl', 'pop_category_id', 'pop_category_id_conf', 'pop_category_id_conf_multipl', 'pop_category_id_country', 'pop_category_id_country_conf', 'pop_category_id_country_conf_multipl', 'user_doc_ad_sim_categories', 'user_doc_ad_sim_categories_conf', 'user_doc_ad_sim_categories_conf_multipl', 'user_doc_ad_sim_topics', 'user_doc_ad_sim_topics_conf', 'user_doc_ad_sim_topics_conf_multipl', 'user_doc_ad_sim_entities', 'user_doc_ad_sim_entities_conf', 'user_doc_ad_sim_entities_conf_multipl', 'doc_event_doc_ad_sim_categories', 'doc_event_doc_ad_sim_categories_conf', 'doc_event_doc_ad_sim_categories_conf_multipl', 'doc_event_doc_ad_sim_topics', 'doc_event_doc_ad_sim_topics_conf', 'doc_event_doc_ad_sim_topics_conf_multipl', 'doc_event_doc_ad_sim_entities', 'doc_event_doc_ad_sim_entities_conf', 'doc_event_doc_ad_sim_entities_conf_multipl' ] TRAFFIC_SOURCE_FV = 'traffic_source' EVENT_HOUR_FV = 'event_hour' EVENT_COUNTRY_FV = 'event_country' EVENT_COUNTRY_STATE_FV = 'event_country_state' EVENT_GEO_LOCATION_FV = 'event_geo_location' EVENT_PLATFORM_FV = 'event_platform' AD_ADVERTISER_FV = 'ad_advertiser' DOC_AD_SOURCE_ID_FV = 'doc_ad_source_id' DOC_AD_PUBLISHER_ID_FV = 'doc_ad_publisher_id' DOC_EVENT_SOURCE_ID_FV = 'doc_event_source_id' DOC_EVENT_PUBLISHER_ID_FV = 'doc_event_publisher_id' DOC_AD_CATEGORY_ID_FV = 'doc_ad_category_id' DOC_AD_TOPIC_ID_FV = 'doc_ad_topic_id' DOC_AD_ENTITY_ID_FV = 'doc_ad_entity_id' DOC_EVENT_CATEGORY_ID_FV = 'doc_event_category_id' DOC_EVENT_TOPIC_ID_FV = 'doc_event_topic_id' DOC_EVENT_ENTITY_ID_FV = 'doc_event_entity_id' # ### Configuring feature vector category_feature_names_integral = ['ad_advertiser', 'doc_ad_category_id_1', 'doc_ad_category_id_2', 'doc_ad_category_id_3', 'doc_ad_topic_id_1', 'doc_ad_topic_id_2', 'doc_ad_topic_id_3', 'doc_ad_entity_id_1', 'doc_ad_entity_id_2', 'doc_ad_entity_id_3', 'doc_ad_entity_id_4', 'doc_ad_entity_id_5', 'doc_ad_entity_id_6', 'doc_ad_publisher_id', 'doc_ad_source_id', 'doc_event_category_id_1', 'doc_event_category_id_2', 'doc_event_category_id_3', 'doc_event_topic_id_1', 'doc_event_topic_id_2', 'doc_event_topic_id_3', 'doc_event_entity_id_1', 'doc_event_entity_id_2', 'doc_event_entity_id_3', 'doc_event_entity_id_4', 'doc_event_entity_id_5', 'doc_event_entity_id_6', 'doc_event_publisher_id', 'doc_event_source_id', 'event_country', 'event_country_state', 'event_geo_location', 'event_hour', 'event_platform', 'traffic_source'] feature_vector_labels_integral = bool_feature_names \ + int_feature_names \ + float_feature_names \ + category_feature_names_integral feature_vector_labels_integral_dict = dict([(key, idx) for idx, key in enumerate(feature_vector_labels_integral)]) with open(OUTPUT_BUCKET_FOLDER + 'feature_vector_labels_integral.txt', 'w') as output: output.writelines('\n'.join(feature_vector_labels_integral)) # ### Building feature vectors def set_feature_vector_cat_value_integral(field_name, field_value, feature_vector): if not is_null(field_value): # and str(field_value) != '-1': feature_vector[feature_vector_labels_integral_dict[field_name]] = float(field_value) def set_feature_vector_cat_top_multi_values_integral( field_name, values, confidences, feature_vector, top=5): top_values = list(filter(lambda z: z != -1, map(lambda y: y[0], sorted(zip(values, confidences), key=lambda x: -x[1]))))[:top] for idx, field_value in list(enumerate(top_values)): set_feature_vector_cat_value_integral( '{}_{}'.format(field_name, idx + 1), field_value, feature_vector) def get_ad_feature_vector_integral( user_doc_ids_viewed, user_views_count, user_categories, user_topics, user_entities, event_country, event_country_state, ad_id, document_id, source_id, doc_ad_publish_time, timestamp_event, platform_event, geo_location_event, doc_event_source_id, doc_event_publisher_id, doc_event_publish_time, traffic_source_pv, advertiser_id, publisher_id, campaign_id, document_id_event, doc_ad_category_ids, doc_ad_cat_confidence_levels, doc_ad_topic_ids, doc_ad_top_confidence_levels, doc_ad_entity_ids, doc_ad_ent_confidence_levels, doc_event_category_ids, doc_event_cat_confidence_levels, doc_event_topic_ids, doc_event_top_confidence_levels, doc_event_entity_ids, doc_event_ent_confidence_levels): try: feature_vector = {} if user_views_count is not None: feature_vector[feature_vector_labels_integral_dict['user_views']] = float(user_views_count) if user_doc_ids_viewed is not None: feature_vector[feature_vector_labels_integral_dict['user_has_already_viewed_doc']] = float( document_id in user_doc_ids_viewed) if ad_id in ad_id_popularity_broad.value: feature_vector[feature_vector_labels_integral_dict['ad_views']] = float( ad_id_popularity_broad.value[ad_id][1]) if document_id in document_id_popularity_broad.value: feature_vector[feature_vector_labels_integral_dict['doc_views']] = float( document_id_popularity_broad.value[document_id][1]) if timestamp_event > -1: dt_timestamp_event = convert_odd_timestamp(timestamp_event) if doc_ad_publish_time is not None: delta_days = (dt_timestamp_event - doc_ad_publish_time).days if 0 <= delta_days <= 365 * 10: # 10 years feature_vector[feature_vector_labels_integral_dict['doc_ad_days_since_published']] = float( delta_days) if doc_event_publish_time is not None: delta_days = (dt_timestamp_event - doc_event_publish_time).days if 0 <= delta_days <= 365 * 10: # 10 years feature_vector[feature_vector_labels_integral_dict['doc_event_days_since_published']] = float( delta_days) # Local period of the day (hours) dt_local_timestamp_event = get_local_datetime(dt_timestamp_event, event_country, event_country_state) local_hour_bin = get_hour_bin(dt_local_timestamp_event.hour) feature_vector[feature_vector_labels_integral_dict['doc_event_hour']] = float( local_hour_bin) # Hour for Decision Trees set_feature_vector_cat_value_integral(EVENT_HOUR_FV, local_hour_bin, feature_vector) # Period of day for FFM # Weekend weekend = int(is_weekend(dt_local_timestamp_event)) feature_vector[feature_vector_labels_integral_dict['event_weekend']] = float(weekend) conf_field_suffix = '_conf' conf_multiplied_field_suffix = '_conf_multipl' # Setting Popularity fields pop_scores = get_popularity_score(event_country, ad_id, document_id, source_id, publisher_id, advertiser_id, campaign_id, document_id_event, doc_ad_category_ids, doc_ad_cat_confidence_levels, doc_ad_topic_ids, doc_ad_top_confidence_levels, doc_ad_entity_ids, doc_ad_ent_confidence_levels, output_detailed_list=True) for score in pop_scores: feature_vector[feature_vector_labels_integral_dict[score[0]]] = score[1] feature_vector[feature_vector_labels_integral_dict[score[0] + conf_field_suffix]] = score[2] feature_vector[feature_vector_labels_integral_dict[score[0] + conf_multiplied_field_suffix]] = \ score[1] * score[2] # Setting User-Doc_ad CB Similarity fields user_doc_ad_cb_sim_scores = get_user_cb_interest_score( user_views_count, user_categories, user_topics, user_entities, timestamp_event, doc_ad_category_ids, doc_ad_cat_confidence_levels, doc_ad_topic_ids, doc_ad_top_confidence_levels, doc_ad_entity_ids, doc_ad_ent_confidence_levels, output_detailed_list=True) for score in user_doc_ad_cb_sim_scores: feature_vector[feature_vector_labels_integral_dict[score[0]]] = score[1] feature_vector[feature_vector_labels_integral_dict[score[0] + conf_field_suffix]] = score[2] feature_vector[feature_vector_labels_integral_dict[score[0] + conf_multiplied_field_suffix]] = \ score[1] * score[2] # Setting Doc_event-doc_ad CB Similarity fields doc_event_doc_ad_cb_sim_scores = get_doc_event_doc_ad_cb_similarity_score( doc_event_category_ids, doc_event_cat_confidence_levels, doc_event_topic_ids, doc_event_top_confidence_levels, doc_event_entity_ids, doc_event_ent_confidence_levels, doc_ad_category_ids, doc_ad_cat_confidence_levels, doc_ad_topic_ids, doc_ad_top_confidence_levels, doc_ad_entity_ids, doc_ad_ent_confidence_levels, output_detailed_list=True) for score in doc_event_doc_ad_cb_sim_scores: feature_vector[feature_vector_labels_integral_dict[score[0]]] = score[1] feature_vector[feature_vector_labels_integral_dict[score[0] + conf_field_suffix]] = score[2] feature_vector[feature_vector_labels_integral_dict[score[0] + conf_multiplied_field_suffix]] = \ score[1] * score[2] # Process code for event_country if event_country in event_country_values_counts: event_country_code = event_country_values_counts[event_country] else: event_country_code = event_country_values_counts[LESS_SPECIAL_CAT_VALUE] set_feature_vector_cat_value_integral(EVENT_COUNTRY_FV, event_country_code, feature_vector) # Process code for event_country_state if event_country_state in event_country_state_values_counts: event_country_state_code = event_country_state_values_counts[event_country_state] else: event_country_state_code = event_country_state_values_counts[LESS_SPECIAL_CAT_VALUE] set_feature_vector_cat_value_integral(EVENT_COUNTRY_STATE_FV, event_country_state_code, feature_vector) # Process code for geo_location_event if geo_location_event in event_geo_location_values_counts: geo_location_event_code = event_geo_location_values_counts[geo_location_event] else: geo_location_event_code = event_geo_location_values_counts[LESS_SPECIAL_CAT_VALUE] # -1 to traffic_source and platform_event if traffic_source_pv is not None: feature_vector[feature_vector_labels_integral_dict[TRAFFIC_SOURCE_FV]] = int(traffic_source_pv - 1) if platform_event is not None: feature_vector[feature_vector_labels_integral_dict[EVENT_PLATFORM_FV]] = int(platform_event - 1) set_feature_vector_cat_value_integral(EVENT_GEO_LOCATION_FV, geo_location_event_code, feature_vector) # set_feature_vector_cat_value_integral(TRAFFIC_SOURCE_FV, traffic_source_pv - 1, feature_vector) # set_feature_vector_cat_value_integral(EVENT_PLATFORM_FV, platform_event - 1, feature_vector) set_feature_vector_cat_value_integral(AD_ADVERTISER_FV, advertiser_id, feature_vector) set_feature_vector_cat_value_integral(DOC_AD_SOURCE_ID_FV, source_id, feature_vector) set_feature_vector_cat_value_integral(DOC_AD_PUBLISHER_ID_FV, publisher_id, feature_vector) set_feature_vector_cat_value_integral(DOC_EVENT_SOURCE_ID_FV, doc_event_source_id, feature_vector) set_feature_vector_cat_value_integral(DOC_EVENT_PUBLISHER_ID_FV, doc_event_publisher_id, feature_vector) set_feature_vector_cat_top_multi_values_integral(DOC_AD_CATEGORY_ID_FV, doc_ad_category_ids, doc_ad_cat_confidence_levels, feature_vector, top=3) set_feature_vector_cat_top_multi_values_integral(DOC_AD_TOPIC_ID_FV, doc_ad_topic_ids, doc_ad_top_confidence_levels, feature_vector, top=3) set_feature_vector_cat_top_multi_values_integral(DOC_EVENT_CATEGORY_ID_FV, doc_event_category_ids, doc_event_cat_confidence_levels, feature_vector, top=3) set_feature_vector_cat_top_multi_values_integral(DOC_EVENT_TOPIC_ID_FV, doc_event_topic_ids, doc_event_top_confidence_levels, feature_vector, top=3) # Process codes for doc_ad_entity_ids doc_ad_entity_ids_codes = [doc_entity_id_values_counts[x] if x in doc_entity_id_values_counts else doc_entity_id_values_counts[LESS_SPECIAL_CAT_VALUE] for x in doc_ad_entity_ids] set_feature_vector_cat_top_multi_values_integral(DOC_AD_ENTITY_ID_FV, doc_ad_entity_ids_codes, doc_ad_ent_confidence_levels, feature_vector, top=6) # Process codes for doc_event_entity_ids doc_event_entity_ids_codes = [doc_entity_id_values_counts[x] if x in doc_entity_id_values_counts else doc_entity_id_values_counts[LESS_SPECIAL_CAT_VALUE] for x in doc_event_entity_ids] set_feature_vector_cat_top_multi_values_integral(DOC_EVENT_ENTITY_ID_FV, doc_event_entity_ids_codes, doc_event_ent_confidence_levels, feature_vector, top=6) # Creating dummy column as the last column # because xgboost have a problem if the last column is undefined for all rows, # saying that dimentions of data and feature_names do not match # feature_vector[feature_vector_labels_dict[DUMMY_FEATURE_COLUMN]] = float(0) # Ensuring that all elements are floats for compatibility with UDF output (ArrayType(FloatType())) # feature_vector = list([float(x) for x in feature_vector]) except Exception as e: raise Exception("[get_ad_feature_vector_integral] ERROR PROCESSING FEATURE VECTOR! Params: {}" .format([user_doc_ids_viewed, user_views_count, user_categories, user_topics, user_entities, event_country, event_country_state, ad_id, document_id, source_id, doc_ad_publish_time, timestamp_event, platform_event, geo_location_event, doc_event_source_id, doc_event_publisher_id, doc_event_publish_time, traffic_source_pv, advertiser_id, publisher_id, campaign_id, document_id_event, doc_ad_category_ids, doc_ad_cat_confidence_levels, doc_ad_topic_ids, doc_ad_top_confidence_levels, doc_ad_entity_ids, doc_ad_ent_confidence_levels, doc_event_category_ids, doc_event_cat_confidence_levels, doc_event_topic_ids, doc_event_top_confidence_levels, doc_event_entity_ids, doc_event_ent_confidence_levels]), e) return SparseVector(len(feature_vector_labels_integral_dict), feature_vector) get_ad_feature_vector_integral_udf = F.udf( lambda user_doc_ids_viewed, user_views_count, user_categories, user_topics, user_entities, event_country, event_country_state, ad_id, document_id, source_id, doc_ad_publish_time, timestamp_event, platform_event, geo_location_event, doc_event_source_id, doc_event_publisher_id, doc_event_publish_time, traffic_source_pv, advertiser_id, publisher_id, campaign_id, document_id_event, category_ids_by_doc, cat_confidence_level_by_doc, topic_ids_by_doc, top_confidence_level_by_doc, entity_ids_by_doc, ent_confidence_level_by_doc, doc_event_category_id_list, doc_event_confidence_level_cat_list, doc_event_topic_id_list, doc_event_confidence_level_top, doc_event_entity_id_list, doc_event_confidence_level_ent: get_ad_feature_vector_integral(user_doc_ids_viewed, user_views_count, user_categories, user_topics, user_entities, event_country, event_country_state, ad_id, document_id, source_id, doc_ad_publish_time, timestamp_event, platform_event, geo_location_event, doc_event_source_id, doc_event_publisher_id, doc_event_publish_time, traffic_source_pv, advertiser_id, publisher_id, campaign_id, document_id_event, category_ids_by_doc, cat_confidence_level_by_doc, topic_ids_by_doc, top_confidence_level_by_doc, entity_ids_by_doc, ent_confidence_level_by_doc, doc_event_category_id_list, doc_event_confidence_level_cat_list, doc_event_topic_id_list, doc_event_confidence_level_top, doc_event_entity_id_list, doc_event_confidence_level_ent), VectorUDT()) # ## Export Train set feature vectors train_set_enriched_df = train_set_df \ .join(documents_categories_grouped_df, on=F.col("document_id_promo") == F.col("documents_categories_grouped.document_id_cat"), how='left') \ .join(documents_topics_grouped_df, on=F.col("document_id_promo") == F.col("documents_topics_grouped.document_id_top"), how='left') \ .join(documents_entities_grouped_df, on=F.col("document_id_promo") == F.col("documents_entities_grouped.document_id_ent"), how='left') \ .join(documents_categories_grouped_df .withColumnRenamed('category_id_list', 'doc_event_category_id_list') .withColumnRenamed('confidence_level_cat_list', 'doc_event_confidence_level_cat_list') .alias('documents_event_categories_grouped'), on=F.col("document_id_event") == F.col("documents_event_categories_grouped.document_id_cat"), how='left') \ .join(documents_topics_grouped_df .withColumnRenamed('topic_id_list', 'doc_event_topic_id_list') .withColumnRenamed('confidence_level_top_list', 'doc_event_confidence_level_top_list') .alias('documents_event_topics_grouped'), on=F.col("document_id_event") == F.col("documents_event_topics_grouped.document_id_top"), how='left') \ .join(documents_entities_grouped_df .withColumnRenamed('entity_id_list', 'doc_event_entity_id_list') .withColumnRenamed('confidence_level_ent_list', 'doc_event_confidence_level_ent_list') .alias('documents_event_entities_grouped'), on=F.col("document_id_event") == F.col("documents_event_entities_grouped.document_id_ent"), how='left') \ .select('display_id', 'uuid_event', 'event_country', 'event_country_state', 'platform_event', 'source_id_doc_event', 'publisher_doc_event', 'publish_time_doc_event', 'publish_time', 'ad_id', 'document_id_promo', 'clicked', 'geo_location_event', 'advertiser_id', 'publisher_id', 'campaign_id', 'document_id_event', 'traffic_source_pv', int_list_null_to_empty_list_udf('doc_event_category_id_list') .alias('doc_event_category_id_list'), float_list_null_to_empty_list_udf('doc_event_confidence_level_cat_list') .alias('doc_event_confidence_level_cat_list'), int_list_null_to_empty_list_udf('doc_event_topic_id_list') .alias('doc_event_topic_id_list'), float_list_null_to_empty_list_udf('doc_event_confidence_level_top_list') .alias('doc_event_confidence_level_top_list'), str_list_null_to_empty_list_udf('doc_event_entity_id_list') .alias('doc_event_entity_id_list'), float_list_null_to_empty_list_udf('doc_event_confidence_level_ent_list') .alias('doc_event_confidence_level_ent_list'), int_null_to_minus_one_udf('source_id').alias('source_id'), int_null_to_minus_one_udf('timestamp_event').alias('timestamp_event'), int_list_null_to_empty_list_udf('category_id_list').alias('category_id_list'), float_list_null_to_empty_list_udf('confidence_level_cat_list') .alias('confidence_level_cat_list'), int_list_null_to_empty_list_udf('topic_id_list').alias('topic_id_list'), float_list_null_to_empty_list_udf('confidence_level_top_list') .alias('confidence_level_top_list'), str_list_null_to_empty_list_udf('entity_id_list').alias('entity_id_list'), float_list_null_to_empty_list_udf('confidence_level_ent_list') .alias('confidence_level_ent_list')) \ .join(user_profiles_df, on=[F.col("user_profiles.uuid") == F.col("uuid_event")], how='left') \ .withColumnRenamed('categories', 'user_categories') \ .withColumnRenamed('topics', 'user_topics') \ .withColumnRenamed('entities', 'user_entities') \ .withColumnRenamed('doc_ids', 'user_doc_ids_viewed') \ .withColumnRenamed('views', 'user_views_count') train_set_feature_vectors_df = train_set_enriched_df \ .withColumn('feature_vector', get_ad_feature_vector_integral_udf( 'user_doc_ids_viewed', 'user_views_count', 'user_categories', 'user_topics', 'user_entities', 'event_country', 'event_country_state', 'ad_id', 'document_id_promo', 'source_id', 'publish_time', 'timestamp_event', 'platform_event', 'geo_location_event', 'source_id_doc_event', 'publisher_doc_event', 'publish_time_doc_event', 'traffic_source_pv', 'advertiser_id', 'publisher_id', 'campaign_id', 'document_id_event', 'category_id_list', 'confidence_level_cat_list', 'topic_id_list', 'confidence_level_top_list', 'entity_id_list', 'confidence_level_ent_list', 'doc_event_category_id_list', 'doc_event_confidence_level_cat_list', 'doc_event_topic_id_list', 'doc_event_confidence_level_top_list', 'doc_event_entity_id_list', 'doc_event_confidence_level_ent_list')) \ .select(F.col('uuid_event').alias('uuid'), 'display_id', 'ad_id', 'document_id_event', F.col('document_id_promo').alias('document_id'), F.col('clicked').alias('label'), 'feature_vector') if evaluation: train_feature_vector_gcs_folder_name = 'train_feature_vectors_integral_eval' else: train_feature_vector_gcs_folder_name = 'train_feature_vectors_integral' train_set_feature_vectors_df.write.parquet(OUTPUT_BUCKET_FOLDER + train_feature_vector_gcs_folder_name, mode='overwrite') # # Export Validation/Test set feature vectors def is_leak(max_timestamp_pv_leak, timestamp_event): return max_timestamp_pv_leak >= 0 and max_timestamp_pv_leak >= timestamp_event is_leak_udf = F.udf(lambda max_timestamp_pv_leak, timestamp_event: int(is_leak(max_timestamp_pv_leak, timestamp_event)), IntegerType()) if evaluation: data_df = validation_set_df else: data_df = test_set_df test_validation_set_enriched_df = data_df.select( 'display_id', 'uuid_event', 'event_country', 'event_country_state', 'platform_event', 'source_id_doc_event', 'publisher_doc_event', 'publish_time_doc_event', 'publish_time', 'ad_id', 'document_id_promo', 'clicked', 'geo_location_event', 'advertiser_id', 'publisher_id', 'campaign_id', 'document_id_event', 'traffic_source_pv', int_list_null_to_empty_list_udf('doc_event_category_id_list') .alias('doc_event_category_id_list'), float_list_null_to_empty_list_udf('doc_event_confidence_level_cat_list') .alias('doc_event_confidence_level_cat_list'), int_list_null_to_empty_list_udf('doc_event_topic_id_list') .alias('doc_event_topic_id_list'), float_list_null_to_empty_list_udf('doc_event_confidence_level_top_list') .alias('doc_event_confidence_level_top_list'), str_list_null_to_empty_list_udf('doc_event_entity_id_list') .alias('doc_event_entity_id_list'), float_list_null_to_empty_list_udf('doc_event_confidence_level_ent_list') .alias('doc_event_confidence_level_ent_list'), int_null_to_minus_one_udf('source_id') .alias('source_id'), int_null_to_minus_one_udf('timestamp_event').alias('timestamp_event'), int_list_null_to_empty_list_udf('category_id_list').alias('category_id_list'), float_list_null_to_empty_list_udf('confidence_level_cat_list') .alias('confidence_level_cat_list'), int_list_null_to_empty_list_udf('topic_id_list').alias('topic_id_list'), float_list_null_to_empty_list_udf('confidence_level_top_list') .alias('confidence_level_top_list'), str_list_null_to_empty_list_udf('entity_id_list').alias('entity_id_list'), float_list_null_to_empty_list_udf('confidence_level_ent_list') .alias('confidence_level_ent_list'), int_null_to_minus_one_udf('max_timestamp_pv').alias('max_timestamp_pv_leak')) \ .join(user_profiles_df, on=[F.col("user_profiles.uuid") == F.col("uuid_event")], how='left') \ .withColumnRenamed('categories', 'user_categories') \ .withColumnRenamed('topics', 'user_topics') \ .withColumnRenamed('entities', 'user_entities') \ .withColumnRenamed('doc_ids', 'user_doc_ids_viewed') \ .withColumnRenamed('views', 'user_views_count') test_validation_set_feature_vectors_df = test_validation_set_enriched_df \ .withColumn('feature_vector', get_ad_feature_vector_integral_udf( 'user_doc_ids_viewed', 'user_views_count', 'user_categories', 'user_topics', 'user_entities', 'event_country', 'event_country_state', 'ad_id', 'document_id_promo', 'source_id', 'publish_time', 'timestamp_event', 'platform_event', 'geo_location_event', 'source_id_doc_event', 'publisher_doc_event', 'publish_time_doc_event', 'traffic_source_pv', 'advertiser_id', 'publisher_id', 'campaign_id', 'document_id_event', 'category_id_list', 'confidence_level_cat_list', 'topic_id_list', 'confidence_level_top_list', 'entity_id_list', 'confidence_level_ent_list', 'doc_event_category_id_list', 'doc_event_confidence_level_cat_list', 'doc_event_topic_id_list', 'doc_event_confidence_level_top_list', 'doc_event_entity_id_list', 'doc_event_confidence_level_ent_list')) \ .select(F.col('uuid').alias('uuid'), 'display_id', 'ad_id', 'document_id_event', F.col('document_id_promo').alias('document_id'), F.col('clicked').alias('label'), is_leak_udf('max_timestamp_pv_leak', 'timestamp_event').alias('is_leak'), 'feature_vector') if evaluation: test_validation_feature_vector_gcs_folder_name = 'validation_feature_vectors_integral' else: test_validation_feature_vector_gcs_folder_name = 'test_feature_vectors_integral' test_validation_set_feature_vectors_df.write.parquet( OUTPUT_BUCKET_FOLDER + test_validation_feature_vector_gcs_folder_name, mode='overwrite') spark.stop()