他山之石可以攻玉。
华为AI Gallery社区的技术贴。
传送门:
系列(一)讲述华为AI Gallery作者通过比赛基本信息为特征构建的模型框架,分析了这种框架构建的优缺点;
系列(二)中讲述了足球预测结果 二分类问题的处理以及需要掌握的知识,即所需技能Notebook+Python+Pandas。
免费数据源1
免费数据源2
如果您是单纯的进行数据分析,利用赔率亚盘凯利必发等找出与赛果的关系,那么可以通过免费获得即可,不要先消耗大量的时间和精力去做爬虫,等到数据齐全再去数据分析,因为数据分析也不一定会给你带来任何启示。
这期介绍建模过程的步骤及代码。
# 读取数据
from pyspark.sql import SparkSession
class MLSReadData:
def __init__(self,
input_file_path,
format="csv",
has_header=True,
delimiter=","):
"""
read dataset
:param input_file_path:
:param format:
:param has_header:
:param delimiter:
"""
self.input_file_path = input_file_path
self.format = format
self.has_header = has_header
self.delimiter = delimiter
self._outputs = {}
def run(self):
spark = SparkSession.builder.getOrCreate()
input_df = spark.read \
.format(self.format) \
.option("header", self.has_header) \
.option("delimiter", self.delimiter) \
.option("inferSchema", True) \
.load(self.input_file_path.strip())
column_names = input_df.columns
for column in column_names:
input_df = input_df.withColumnRenamed(column, column.strip())
self._outputs = {
"output_port_1": input_df
}
def get_outputs(self):
return self._outputs
params = {
"input_file_path": "./footballdata.csv", #@param {"label":"input_file_path","type":"string","required":"true","helpTip":""}
"format": "csv", #@param {"label":"format","type":"string","required":"false","helpTip":""}
"has_header": True, #@param {"label":"has_header","type":"boolean","required":"false","helpTip":""}
"delimiter": "," #@param {"label":"delimiter","type":"string","required":"false","helpTip":""}
}
read_data = MLSReadData(**params)
read_data.run()
#@output {"label":"dataframe","name":"read_data.get_outputs()['output_port_1']","type":"DataFrame"}
# 数据类型转换
from pyspark.sql.types import StringType, IntegerType, LongType, FloatType, DoubleType, BooleanType, DateType, \
TimestampType
from pyspark.sql.functions import col
class MLSModifyDataType:
"""
modify datatype of dataframe
"""
def __init__(self,
inputs,
column_type_map_str
):
"""
init
:param inputs:
dic of upstream node output, should have key: dataframe
:param column_type_map_str: the format like: "column_a:string,column_b:integer",
column type can be: string,integer,long,float,double,bool.date,timestamp
"""
self.inputs = inputs
self.column_type_map_str = column_type_map_str
self.dataframe = None
self.column_type_map = {}
self._outputs = {}
def _check_and_solve_input_param_when_output(self):
# check param inputs
if not isinstance(self.inputs, dict):
raise Exception("parameter \"inputs\" should be dict and has key \"dataframe\"")
if "dataframe" not in self.inputs:
raise Exception("parameter \"inputs\" should have key: \"dataframe\"")
self.dataframe = self.inputs["dataframe"]
# check and solve column_type_map_str
if self.column_type_map_str is None or not isinstance(self.column_type_map_str, str) \
or not self.column_type_map_str.strip():
raise Exception("should input parameter \"column_type_map\", and the type should string")
pairs = self.column_type_map_str.strip().split(",")
for pair in pairs:
array = pair.strip().split(":")
if len(array) != 2:
raise Exception("parameter \"column_type_map_str\" should obey the format,"
"like \"column_a:string,column_b:integer\"")
self.column_type_map[array[0].strip()] = array[1].strip()
def _execute_self_node_output(self):
data_type_map = {
"string": StringType(),
"integer": IntegerType(),
"long": LongType(),
"float": FloatType(),
"double": DoubleType(),
"bool": BooleanType(),
"date": DateType(),
"timestamp": TimestampType()
}
result_dataframe = self.dataframe
for (column_name, data_type) in self.column_type_map.items():
result_dataframe = result_dataframe.withColumn(column_name,
col(column_name).cast(data_type_map[data_type]))
self._outputs = {
"output_port_1": result_dataframe
}
def run(self):
self._check_and_solve_input_param_when_output()
self._execute_self_node_output()
def get_outputs(self):
return self._outputs
inputs = {
"dataframe": read_data.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"}
}
params = {
"inputs": inputs,
"column_type_map_str": "neutral:string,month:string,season:string" #@param {"label":"column_type_map_str","type":"string","required":"true","helpTip":""}
}
modify_data_type = MLSModifyDataType(**params)
modify_data_type.run()
#@output {"label":"dataframe","name":"modify_data_type.get_outputs()['output_port_1']","type":"DataFrame"}
# 缺失值填充,将缺失填充为0
from pyspark.sql.functions import when
import pyspark.sql.functions as F
class MLSMissingValueImpute:
"""
Impute missing value
"""
def __init__(self,inputs):
self.dataframe = inputs["dataframe"]
self._outputs = {}
def run(self):
missing_columns=[]
for col in df.columns:
if self.dataframe.filter(self.dataframe[col].isNull()).count()>0:
missing_columns.append(col)
# print(col, "\t", "with null values: ", count)
for col in missing_columns:
self.dataframe = self.dataframe.withColumn(col,when(self.dataframe[col].isNull() == True, F.lit(0)).otherwise(self.dataframe[col]))
self._outputs = {"output_port_1": self.dataframe}
def get_outputs(self):
return self._outputs
inputs = {
"dataframe": modify_data_type.get_outputs()['output_port_1'] #@input {"type":"DataFrame", "label": "dataframe"}
}
params = {
"inputs": inputs
}
missing_value_impute=MLSMissingValueImpute(**params)
missing_value_impute.run()
#@output {"label":"dataframe","name":"missing_value_impute.get_outputs()['output_port_1']","type":"DataFrame"}
# 数据集行过滤,筛选日期2015-01-01至2019-12-31的数据作为训练集
from pyspark.sql.dataframe import DataFrame
class MLSDatasetFilter:
"""
dataset filter
"""
def __init__(self,
inputs,
column_name,
condition_map_str
):
self.inputs = inputs
self.dataframe = None
self.column_name = column_name
self.condition_map_str = condition_map_str
self.condition_map = {}
self._outputs = {}
def _check_and_solve_param(self):
# check param inputs
if not isinstance(self.inputs, dict):
raise Exception("parameter \"inputs\" should be dict and has key \"dataframe\"")
if "dataframe" not in self.inputs:
raise Exception("parameter \"inputs\" should have key: \"dataframe\"")
self.dataframe = self.inputs["dataframe"]
# check param type
if not isinstance(self.dataframe, DataFrame):
raise Exception("parameter \"dataframe\" should be DataFrame of pyspark")
if not isinstance(self.column_name, str):
raise Exception("parameter \"column_name\" should be str")
if not isinstance(self.condition_map_str, str):
raise Exception("parameter \"condition_map_str\" should be str")
# solve param condition_map_str
pairs = self.condition_map_str.strip().split(";")
for pair in pairs:
array = pair.strip().split(":")
if len(array) != 1 and len(array) != 2:
raise Exception(
"parameter \"condition_map_str\" should have fixed format, please read the annotation.")
if len(array) == 2:
self.condition_map[array[0].strip()] = array[1].strip()
elif len(array) == 1:
self.condition_map[array[0].strip()] = ""
def _execute(self):
res_dataframe = self.dataframe
for (operator, value) in self.condition_map.items():
condition_expr = self.column_name.strip() + " " + operator.strip()
formated_operator = operator.strip().upper()
if formated_operator == 'BETWEEN' or formated_operator == 'NOT BETWEEN':
value_array = value.split(',')
if len(value_array) != 2:
raise Exception("if use expr 'between' or 'not between', the range value string should be"
"separated by comma, and the result should be array with length 2")
condition_expr = condition_expr + " '" + value_array[0].strip() + "' AND '" + value_array[1].strip() \
+ "'"
elif formated_operator == 'IS NULL' or formated_operator == 'IS NOT NULL':
condition_expr = condition_expr
else:
condition_expr = condition_expr + " '" + value.strip() + "'"
res_dataframe = res_dataframe.filter(condition_expr)
self._outputs = {
"output_port_1": res_dataframe
}
def run(self):
self._check_and_solve_param()
self._execute()
def get_outputs(self):
return self._outputs
inputs = {
"dataframe":missing_value_impute.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"}
}
params = {
"inputs": inputs,
"column_name": "date", #@param {"label":"column_name","type":"string","required":"true","helpTip":""}
"condition_map_str": "BETWEEN:2015-01-01,2019-12-31" #@param {"label":"condition_map_str","type":"string","required":"true","helpTip":""}
}
dataset_filter_train_data = MLSDatasetFilter(**params)
dataset_filter_train_data.run()
#@output {"label":"dataframe","name":"dataset_filter_train_data.get_outputs()['output_port_1']","type":"DataFrame"}
# 数据集行过滤,筛选日期2020-01-01至2021-05-31的数据作为验证集
inputs = {
"dataframe":missing_value_impute.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"}
}
params = {
"inputs": inputs,
"column_name": "date", #@param {"label":"column_name","type":"string","required":"true","helpTip":""}
"condition_map_str": "BETWEEN:2020-01-01,2021-05-31" #@param {"label":"condition_map_str","type":"string","required":"true","helpTip":""}
}
dataset_filter_valid_data = MLSDatasetFilter(**params)
dataset_filter_valid_data.run()
#@output {"label":"dataframe","name":"dataset_filter_valid_data.get_outputs()['output_port_1']","type":"DataFrame"}
# 训练集选择特征列作为模型输入
class MLSSelectColumns:
"""
select columns
"""
def __init__(self,
inputs,
selected_cols_str):
"""
select specified columns of dataframe
:param inputs:
dic of upstream node output, should have key: dataframe
:param dataframe: dataframe for selecting some columns
:param selected_cols_str: columns's string, separated bu comma
"""
self.inputs = inputs
self.selected_cols_str = selected_cols_str
self.dataframe = None
self.selected_cols = []
self._outputs = {}
def _check_and_solve_input_param_when_output(self):
# check param inputs
if not isinstance(self.inputs, dict):
raise Exception("parameter \"inputs\" should be dict and has key \"dataframe\"")
if "dataframe" not in self.inputs:
raise Exception("parameter \"inputs\" should have key: \"dataframe\"")
self.dataframe = self.inputs["dataframe"]
# check selected_cols_str
if self.selected_cols_str is None or not isinstance(self.selected_cols_str, str) \
or not self.selected_cols_str.strip():
raise Exception("should input parameter \"selected_cols_str\"")
self.selected_cols = [column.strip() for column in self.selected_cols_str.strip().split(",")]
column_set = set()
for column in self.dataframe.columns:
column_set.add(column)
for select_col in self.selected_cols:
if select_col not in column_set:
raise Exception("column %s does't exist in dataframe columns" % select_col)
def run(self):
self._check_and_solve_input_param_when_output()
result_df = self.dataframe.select(self.selected_cols)
self._outputs = {"output_port_1": result_df}
def get_outputs(self):
return self._outputs
inputs = {
"dataframe": dataset_filter_train_data.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"}
}
params = {
"inputs": inputs,
"selected_cols_str": "month, season, home_team, away_team, tournament, neutral, win_result,num_5,diff_num_5,win_num_5,lose_num_5,num_3,diff_num_3,win_num_3,lose_num_3,num_1,diff_num_1,win_num_1,lose_num_1,\
num_team_5,diff_num_team_5,win_num_team_5,lose_num_team_5,num_team_3,diff_num_team_3,win_num_team_3,lose_num_team_3,num_team_1,diff_num_team_1,win_num_team_1,lose_num_team_1,\
num_year_15,diff_num_year_15,win_num_year_15,lose_num_year_15,num_year_7,diff_num_year_7,win_num_year_7,lose_num_year_7,num_year_3,diff_num_year_3,win_num_year_3,lose_num_year_3,\
num_year_2,diff_num_year_2,win_num_year_2,lose_num_year_2,num_year_1,diff_num_year_1,win_num_year_1,lose_num_year_1,\
away_num,away_win_num,away_lose_num,away_win_rate,home_num,home_win_num,home_lose_num,home_win_rate"}
select_columns_train_data = MLSSelectColumns(**params)
select_columns_train_data.run()
#@output {"label":"dataframe","name":"select_columns_train_data.get_outputs()['output_port_1']","type":"DataFrame"}
# 验证集选择特征列作为预测输入
inputs = {
"dataframe": dataset_filter_valid_data.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"}
}
params = {
"inputs": inputs,
"selected_cols_str": "month, season, home_team, away_team, tournament, neutral, win_result,num_5,diff_num_5,win_num_5,lose_num_5,num_3,diff_num_3,win_num_3,lose_num_3,num_1,diff_num_1,win_num_1,lose_num_1,\
num_team_5,diff_num_team_5,win_num_team_5,lose_num_team_5,num_team_3,diff_num_team_3,win_num_team_3,lose_num_team_3,num_team_1,diff_num_team_1,win_num_team_1,lose_num_team_1,\
num_year_15,diff_num_year_15,win_num_year_15,lose_num_year_15,num_year_7,diff_num_year_7,win_num_year_7,lose_num_year_7,num_year_3,diff_num_year_3,win_num_year_3,lose_num_year_3,\
num_year_2,diff_num_year_2,win_num_year_2,lose_num_year_2,num_year_1,diff_num_year_1,win_num_year_1,lose_num_year_1,\
away_num,away_win_num,away_lose_num,away_win_rate,home_num,home_win_num,home_lose_num,home_win_rate"}
select_columns_valid_data = MLSSelectColumns(**params)
select_columns_valid_data.run()
#@output {"label":"dataframe","name":"select_columns_valid_data.get_outputs()['output_port_1']","type":"DataFrame"}
#训练模型,以逻辑回归分类为例
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator, \
VectorAssembler, IndexToString, StandardScaler
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import NumericType
class MLSLogisticRegressionClassifier:
"""
logistic regression classifier
"""
def __init__(self,
inputs,
b_output_action=True,
b_use_default_encoder=True,
input_features_str=None,
outer_pipeline_stages=None,
label_col=None,
classifier_label_index_col="label_index",
classifier_feature_vector_col="model_features",
prediction_col="prediction",
prediction_index_col="prediction_index",
max_iter=100,
reg_param=0.0,
elastic_net_param=0.0,
tol=1e-6,
fit_intercept=True,
standardization=True,
aggregation_depth=2,
family="auto",
lower_bounds_on_coefficients=None,
upper_bounds_on_coefficients=None,
lower_bounds_on_intercepts=None,
upper_bounds_on_intercepts=None
):
"""
A logistic regression classifier
:param inputs:
dic of upstream node output, should have key: dataframe
:param b_output_action:
If true, the output of this class is a pipeline model;
If it is false, only the random forest classifier output pipeline stage is available.
In this case, users can edit the code of the workflow node for custom execution.
(default: True)
:param b_use_default_encoder:
If true, use StringIndexer and OneHotEncoderEstimator for string features;
use StandardScaler for numerical features; then train a random forest classifier
and obtain a pipeline model.
(default: True)
:param dataframe:
Used when b_output_action=true.
:param input_features_str:
Input features, separated by commas.
:param outer_pipeline_stages:
When users edit the code of a workflow node, the stages will be collected in the upper node.
:param label_col:
The target column of the dataframe.
:param classifier_label_index_col:
The label column value of the lr classifier
(default: "label_index")
:param classifier_feature_vector_col:
The feature column of the lr classifier.
(default: "model_features")
:param prediction_col:
Model prediction column name.
(default: "prediction")
:param prediction_index_col
Model prediction index column name.
(default: "prediction_index")
:param max_iter:
The maximum number of iterations
(default: 100)
:param reg_param:
The regularizer parameter.
(default; 0.0)
:param elastic_net_param:
ElasticNet mixed parameters, the range is [0, 1]. For alpha = 0, the penalty is L2 penalty.
For alpha = 1, this is the L1 penalty.
(default: 0.0)
:param tol:
The convergence tolerance for the iterative algorithms.
(default; 1e-6)
:param fit_intercept:
Whether to fit an intercept term.
(default: True)
:param standardization:
Whether to standardize the training features before fitting the model.
(default: True)
:param aggregation_depth:
Suggested depth for treeAggregate.
(default: 2)
:param family:
The name of family which is a description of the label distribution to be used in the model,
Supported "auto", "binomial", "multinomial".
(default: "auto")
:param lower_bounds_on_coefficients:
The lower bounds on coefficients if fitting under bound constrained optimization.
(default: None)
:param upper_bounds_on_coefficients:
The upper bounds on coefficients if fitting under bound constrained optimization.
(default: None)
:param lower_bounds_on_intercepts:
The lower bounds on intercepts if fitting under bound constrained optimization.
(default: None)
:param upper_bounds_on_intercepts:
The upper bounds on intercepts if fitting under bound constrained optimization.
(default: None)
"""
self.inputs = inputs
self.b_output_action = b_output_action
self.b_use_default_encoder = b_use_default_encoder
self.input_features_str = input_features_str
self.outer_pipeline_stages = outer_pipeline_stages
self.label_col = label_col
self.classifier_label_index_col = classifier_label_index_col
self.classifier_feature_vector_col = classifier_feature_vector_col
self.prediction_col = prediction_col
self.prediction_index_col = prediction_index_col
self.max_iter = max_iter
self.reg_param = reg_param
self.elastic_net_param = elastic_net_param
self.tol = tol
self.fit_intercept = fit_intercept
self.standardization = standardization
self.aggregation_depth = aggregation_depth
self.family = family
self.lower_bounds_on_coefficients = lower_bounds_on_coefficients
self.upper_bounds_on_coefficients = upper_bounds_on_coefficients
self.lower_bounds_on_intercepts = lower_bounds_on_intercepts
self.upper_bounds_on_intercepts = upper_bounds_on_intercepts
self.dataframe = None
self._input_feature_cols = []
self._df_column_type_map = {}
self.labels = []
self._outputs = {}
def _check_and_solve_input_param_when_output(self):
# check param inputs
if not isinstance(self.inputs, dict):
raise Exception("parameter \"inputs\" should be dict and has key \"dataframe\"")
if "dataframe" not in self.inputs:
raise Exception("parameter \"inputs\" should have key: \"dataframe\"")
self.dataframe = self.inputs["dataframe"]
# check outer_pipeline_stages
if self.outer_pipeline_stages is None:
self.outer_pipeline_stages = []
if not isinstance(self.outer_pipeline_stages, list):
raise Exception("The parameter \"outer_pipeline_stages\" should be a list type.")
# check label_col
if self.label_col is None or not self.label_col.strip():
raise Exception("The parameter \"label_column\"should be passed.")
self._df_column_type_map = {}
for field in self.dataframe.schema.fields:
self._df_column_type_map[field.name] = field.dataType
if self.label_col not in self._df_column_type_map:
raise Exception("The label column %s doesn't exist in dataframe." % self.label_col)
# check input_features_str
if self.input_features_str is None or not self.input_features_str.strip():
self._input_feature_cols = self.dataframe.columns
self._input_feature_cols.remove(self.label_col)
else:
self._input_feature_cols = [column.strip() for column in
self.input_features_str.split(",")]
def _execute_default_feature_encoder(self):
label_string_indexer = StringIndexer() \
.setInputCol(self.label_col) \
.setOutputCol(self.classifier_label_index_col) \
.setHandleInvalid("skip") \
.fit(self.dataframe)
self.labels = label_string_indexer.labels
self.outer_pipeline_stages.append(label_string_indexer)
categorical_cols = []
numerical_cols = []
other_cols = []
for column in self._input_feature_cols:
if column in self._df_column_type_map:
if isinstance(self._df_column_type_map[column], NumericType):
numerical_cols.append(column)
elif type(self._df_column_type_map[column]) is VectorUDT:
other_cols.append(column)
else:
categorical_cols.append(column)
else:
other_cols.append(column)
for cat_column in categorical_cols:
string_indexer = StringIndexer() \
.setInputCol(cat_column) \
.setHandleInvalid("keep") \
.setOutputCol("%s_index" % cat_column)
self.outer_pipeline_stages.append(string_indexer)
cat_columns_index = ["%s_index" % cat_column for cat_column in
categorical_cols]
cat_columns_onehot = ["%s_onehot" % cat_column for cat_column in
categorical_cols]
onehot_encoder = OneHotEncoderEstimator() \
.setInputCols(cat_columns_index) \
.setHandleInvalid("keep") \
.setOutputCols(cat_columns_onehot)
self.outer_pipeline_stages.append(onehot_encoder)
assembled_features = cat_columns_onehot + numerical_cols + other_cols
# spark2.3.2 not support VectorAssembler.setHandleInvalid("keep")
vector_assembler = VectorAssembler() \
.setInputCols(assembled_features) \
.setOutputCol("lr_assembled_features")
self.outer_pipeline_stages.append(vector_assembler)
standard_scaler = StandardScaler() \
.setInputCol("lr_assembled_features") \
.setWithMean(False) \
.setWithStd(True) \
.setOutputCol(self.classifier_feature_vector_col)
self.outer_pipeline_stages.append(standard_scaler)
def _execute_self_node_output(self):
if self.b_use_default_encoder:
self._execute_default_feature_encoder()
else:
if len(self._input_feature_cols) == 1:
self.classifier_feature_vector_col = self._input_feature_cols[
0]
else:
# spark2.3.2 not support setHandleInvalid
vector_assembler = VectorAssembler() \
.setInputCols(self._input_feature_cols) \
.setOutputCol("lr_assembled_features")
self.outer_pipeline_stages.append(vector_assembler)
standard_scaler = StandardScaler() \
.setInputCol("lr_assembled_features") \
.setWithMean(False) \
.setWithStd(True) \
.setOutputCol(self.classifier_feature_vector_col)
self.outer_pipeline_stages.append(standard_scaler)
lr_classifier = self._get_lr_classifier()
if self.b_use_default_encoder:
lr_classifier.setPredictionCol(self.prediction_index_col)
self.outer_pipeline_stages.append(lr_classifier)
if self.b_use_default_encoder:
label_index_to_string = IndexToString() \
.setInputCol(self.prediction_index_col) \
.setOutputCol(self.prediction_col) \
.setLabels(self.labels)
self.outer_pipeline_stages.append(label_index_to_string)
pipeline_model = Pipeline().setStages(self.outer_pipeline_stages).fit(
self.dataframe)
self._outputs = {
"output_port_1": pipeline_model
}
def _add_self_node_to_workflow(self):
self.classifier_feature_vector_col = self.input_features_str
lr_classifier = self._get_lr_classifier()
self._outputs = {"output_port_1": lr_classifier}
def _get_lr_classifier(self):
lr_classifier = LogisticRegression() \
.setFeaturesCol(self.classifier_feature_vector_col) \
.setLabelCol(self.classifier_label_index_col) \
.setMaxIter(self.max_iter) \
.setRegParam(self.reg_param) \
.setElasticNetParam(self.elastic_net_param) \
.setTol(self.tol) \
.setFitIntercept(self.fit_intercept) \
.setStandardization(self.standardization) \
.setAggregationDepth(self.aggregation_depth) \
.setFamily(self.family)
if self.lower_bounds_on_coefficients:
lr_classifier.setLowerBoundsOnCoefficients(self.lower_bounds_on_coefficients)
if self.upper_bounds_on_coefficients:
lr_classifier.setUpperBoundsOnCoefficients(self.upper_bounds_on_coefficients)
if self.lower_bounds_on_intercepts:
lr_classifier.setLowerBoundsOnIntercepts(self.lower_bounds_on_intercepts)
if self.upper_bounds_on_intercepts:
lr_classifier.setUpperBoundsOnIntercepts(self.upper_bounds_on_intercepts)
return lr_classifier
def run(self):
if self.b_output_action:
self._check_and_solve_input_param_when_output()
self._execute_self_node_output()
else:
self._add_self_node_to_workflow()
def get_outputs(self):
return self._outputs
inputs = {
"dataframe": select_columns_train_data.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"}
}
params = {
"inputs": inputs,
"b_output_action": True,
"b_use_default_encoder": True, #@param {"label": "b_use_default_encoder", "type": "boolean", "required": "true", "helpTip": ""}
"input_features_str": "", #@param {"label": "input_features_str", "type": "string", "required": "false", "helpTip": ""}
"outer_pipeline_stages": None,
"label_col": "win_result", #@param {"label": "label_col", "type": "string", "required": "true", "helpTip": "target label column"}
"classifier_label_index_col": "label_index", #@param {"label": "classifier_label_index_col", "type": "string", "required": "true", "helpTip": ""}
"classifier_feature_vector_col": "model_features", #@param {"label": "classifier_feature_vector_col", "type": "string", "required": "true", "helpTip": ""}
"prediction_col": "prediction", #@param {"label": "prediction_col", "type": "string", "required": "true", "helpTip": ""}
"prediction_index_col": "prediction_index", #@param {"label": "prediction_index_col", "type": "string", "required": "true", "helpTip": ""}
"max_iter": 100, #@param {"label": "max_iter", "type": "integer", "required": "true", "range": "(0,2147483647]", "helpTip": ""}
"reg_param": 0, #@param {"label": "reg_param", "type": "number", "required": "true", "range": "[0,none)", "helpTip": ""}
"elastic_net_param": 0, #@param {"label": "elastic_net_param", "type": "number", "required": "true", "range": "[0,none)", "helpTip": ""}
"tol": 0.000001, #@param {"label": "tol", "type": "number", "required": "true", "range": "(0,none)", "helpTip": ""}
"fit_intercept": True, #@param {"label": "fit_intercept", "type": "boolean", "required": "true", "helpTip": ""}
"standardization": True, #@param {"label": "standardization", "type": "boolean", "required": "true", "helpTip": ""}
"aggregation_depth": 2, #@param {"label": "aggregation_depth", "type": "integer", "required": "true", "range": "(0,2147483647]", "helpTip": ""}
"family": "auto", #@param {"label": "family", "type": "enum", "required": "true", "options":"auto,binomial,multinomial", "helpTip": ""}
"lower_bounds_on_coefficients": None,
"upper_bounds_on_coefficients": None,
"lower_bounds_on_intercepts": None,
"upper_bounds_on_intercepts": None
}
lr_classifier = MLSLogisticRegressionClassifier(**params)
lr_classifier.run()
#@output {"label":"pipeline_model","name":"lr_classifier.get_outputs()['output_port_1']","type":"PipelineModel"}
# 模型预测
class MLSModelPredict:
"""
model predict
"""
def __init__(self,
inputs):
"""
model prediction
:param input:
dic of upstream node output, should have key: dataframe
"""
self.inputs = inputs
self.dataframe = None
self.pipeline_model = None
self._outputs = {}
def _check_and_solve_param(self):
# check param inputs
if not isinstance(self.inputs, dict):
raise Exception("parameter \"inputs\" should be dict and has key \"dataframe\"")
if "dataframe" not in self.inputs:
raise Exception("parameter \"inputs\" should have key: \"dataframe\"")
if "pipeline_model" not in self.inputs:
raise Exception("parameter \"inputs\" should have key: \"pipeline_model\"")
self.dataframe = self.inputs["dataframe"]
self.pipeline_model = self.inputs["pipeline_model"]
def _execute(self):
predict_dataframe = self.pipeline_model.transform(self.dataframe)
self._outputs = {
"output_port_1": predict_dataframe
}
def run(self):
self._check_and_solve_param()
self._execute()
def get_outputs(self):
return self._outputs
inputs = {
"dataframe": select_columns_valid_data.get_outputs()['output_port_1'], #@input {"label":"dataframe","type":"DataFrame"}
"pipeline_model": lr_classifier.get_outputs()['output_port_1'] #@input {"label":"pipeline_model","type":"PipelineModel"}
}
params = {
"inputs": inputs
}
model_predict = MLSModelPredict(**params)
model_predict.run()
#@output {"label":"dataframe","name":"model_predict.get_outputs()['output_port_1']","type":"DataFrame"}
# 模型预测结果评估
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql.types import NumericType, Row, StructField, StringType, StructType
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import col
class MLSBinaryClassEvaluation:
"""
binary class evaluation
"""
def __init__(self,
inputs,
label_col,
probability_col="probability",
prediction_index_col="prediction_index",
label_index_col="label_index"
):
"""
init
:param inputs:
dic of upstream node output, should have key: dataframe
:param label_col:
column name of label
:param probability_col: probability column in predict_dataframe, can calculate pr area and roc area with it
:param prediction_index_col: column of prediction index in predict_dataframe,
can calculate precision, recall, f1, confusion matrix with it
:param label_index_col: label index column in predict_dataframe
"""
self.inputs = inputs
self.label_col = label_col
self.probability_col = probability_col
self.prediction_index_col = prediction_index_col
self.label_index_col = label_index_col
self.predict_dataframe = None
self.df_column_type_map = {}
self.result_metric_map = {}
self._outputs = {}
def _check_and_solve_param(self):
# check param inputs
if not isinstance(self.inputs, dict):
raise Exception("parameter \"inputs\" should be dict and has key \"predict_dataframe\"")
if "predict_dataframe" not in self.inputs:
raise Exception("parameter \"inputs\" should have key: \"predict_dataframe\"")
self.predict_dataframe = self.inputs["predict_dataframe"]
# get dataframe column type
for field in self.predict_dataframe.schema.fields:
self.df_column_type_map[field.name] = field.dataType
# check label_col
if not isinstance(self.label_col, str):
raise Exception("parameter \"label_col\" should be str")
if not self.label_col.strip():
raise Exception("should input parameter \"label_col\"")
if self.label_col not in self.df_column_type_map:
raise Exception("column \"%s\" doesn't exist in predict_dataframe" % self.label_col)
# check label_index_col
if self.label_index_col not in self.df_column_type_map:
raise Exception("column \"%s\" doesn't exist in predict_dataframe" % self.label_index_col)
if not isinstance(self.df_column_type_map[self.label_index_col], NumericType):
raise Exception("column \"%s\" should be numeric in predict_dataframe" % self.label_index_col)
# check probability_col
if self.probability_col not in self.df_column_type_map:
raise Exception("column \"%s\" doesn't exist in predict_dataframe" % self.probability_col)
# check prediction_index_col
if self.prediction_index_col not in self.df_column_type_map:
raise Exception("column \"%s\" doesn't exist in predict_dataframe" % self.prediction_index_col)
if not isinstance(self.df_column_type_map[self.prediction_index_col], NumericType):
raise Exception("column \"%s\" should be numeric in predict_dataframe" % self.prediction_index_col)
def _calculate_result_metric_map(self):
# calculate pr area and roc area
metrics_rdd = self.predict_dataframe \
.select(self.probability_col, self.label_index_col) \
.rdd \
.map(lambda row: (float(row[0][1]), float(row[1])))
binary_class_metrics = BinaryClassificationMetrics(metrics_rdd)
pr_roc_map = {
"pr_area": binary_class_metrics.areaUnderPR,
"roc_area": binary_class_metrics.areaUnderROC
}
self.result_metric_map["roc"] = pr_roc_map
# calculate precision,recall,f1
prediction_and_labels = self.predict_dataframe \
.select(self.prediction_index_col, self.label_index_col) \
.rdd \
.map(lambda row: (float(row[0]), float(row[1])))
multi_class_metrics = MulticlassMetrics(prediction_and_labels)
accuracy_map = {
"accuracy": multi_class_metrics.accuracy,
"precision": multi_class_metrics.precision(1.0),
"recall": multi_class_metrics.recall(1.0),
"f1": multi_class_metrics.fMeasure(1.0)
}
self.result_metric_map["accuracy"] = accuracy_map
# calculate confusion matrix
confusion_matrix = multi_class_metrics.confusionMatrix().toArray().tolist()
confusion_matrix_map = {
"confusion_matrix": confusion_matrix
}
self.result_metric_map["confusion_matrix"] = confusion_matrix_map
def _transfrom_metric_map_to_result_dataframe(self):
# get schema of result dataframe
result_column_names = ["statistics metric", "statistics value 1", "statistics value 2"]
result_fields = []
for column in result_column_names:
column_field = StructField(column, StringType(), False)
result_fields.append(column_field)
result_schema = StructType(result_fields)
# get row array of result dataframe
result_row_array = []
# add roc to result dataframe
roc_map = self.result_metric_map["roc"]
for (key, value) in roc_map.items():
row_map = {
result_column_names[0]: str(key),
result_column_names[1]: str(value),
result_column_names[2]: ""
}
result_row_array.append(Row(**row_map))
# add accuracy,precision,recall,f1 to result dataframe
accuracy_map = self.result_metric_map["accuracy"]
for (key, value) in accuracy_map.items():
row_map = {
result_column_names[0]: str(key),
result_column_names[1]: str(value),
result_column_names[2]: ""
}
result_row_array.append(Row(**row_map))
# add confusion matrix to result dataframe
# get labels from label index column
label_indexes = self.predict_dataframe.select(self.label_index_col).distinct().sort(
col(self.label_index_col).asc()).rdd.map(lambda row: row[0]).collect()
if len(label_indexes) > 2:
raise Exception("the count of label in \"predict_dataframe\" should less than or equal to 2")
label_rows = self.predict_dataframe.select(self.label_index_col, self.label_col).distinct().rdd.collect()
label_index_map = {}
for row in label_rows:
label_index_map[row[0]] = str(row[1])
confusion_array = self.result_metric_map["confusion_matrix"]["confusion_matrix"]
confusion_show_arr_arr = []
confusion_show_arr_arr.append(["" for value in range(len(label_indexes) + 1)])
header_arr = ["confusion matrix"]
for label in label_indexes:
value = label_index_map[label]
header_arr.append(value)
confusion_show_arr_arr.append(header_arr)
for row_index in range(len(label_indexes)):
value_arr = []
tag = label_index_map[label_indexes[row_index]]
value_arr.append(tag)
for col_index in range(len(label_indexes)):
value_arr.append(str(int(confusion_array[row_index][col_index])))
confusion_show_arr_arr.append(value_arr)
for arr in confusion_show_arr_arr:
row_map = {}
for index in range(len(result_column_names)):
row_map[result_column_names[index]] = arr[index]
result_row_array.append(Row(**row_map))
# create result dataframe
spark = SparkSession.builder.getOrCreate()
result_dataframe = spark.createDataFrame(result_row_array, result_schema)
self._outputs["output_port_1"] = result_dataframe
def _execute(self):
self._calculate_result_metric_map()
self._transfrom_metric_map_to_result_dataframe()
def run(self):
self._check_and_solve_param()
self._execute()
def get_outputs(self):
return self._outputs
inputs = {
"predict_dataframe": model_predict.get_outputs()['output_port_1'] #@input {"label":"dataframe","type":"DataFrame"}
}
params = {
"inputs": inputs,
"label_col": "win_result", #@param {"label": "label_col", "type": "string", "required": "true", "helpTip": ""}
"probability_col": "probability", #@param {"label": "probability_col", "type": "string", "required": "true", "helpTip": ""}
"prediction_index_col": "prediction_index", #@param {"label": "prediction_index_col", "type": "string", "required": "true", "helpTip": ""}
"label_index_col": "label_index" #@param {"label": "label_index_col", "type": "string", "required": "true", "helpTip": ""}
}
binary_class_evaluation = MLSBinaryClassEvaluation(**params)
binary_class_evaluation.run()
#@output {"label":"dataframe","name":"binary_class_evaluation.get_outputs()['output_port_1']","type":"DataFrame"}
binary_class_evaluation.get_outputs()['output_port_1'].show()
# 保存模型
class MLSSavePipelineModel:
"""
save model
"""
def __init__(self,
inputs,
output_model_path):
"""
save model
::param inputs:
dic of upstream node output, should have key: pipeline_model
:param output_model_path: pipeline model path in hdfs
"""
self.inputs = inputs
self.output_model_path = output_model_path
self.pipeline_model = None
def _check_and_solve_input_param_when_output(self):
# check param inputs
if not isinstance(self.inputs, dict):
raise Exception("parameter \"inputs\" should be dict and has key \"pipeline_model\"")
if "pipeline_model" not in self.inputs:
raise Exception("parameter \"inputs\" should have key: \"pipeline_model\"")
self.pipeline_model = self.inputs["pipeline_model"]
# check param obs_model_path
if self.output_model_path is None or not isinstance(self.output_model_path,
str) or not self.output_model_path.strip():
raise Exception("should input parameter \"output_model_path\", and type should be str")
def _execute(self):
self.pipeline_model.write().overwrite().save(self.output_model_path)
def run(self):
self._check_and_solve_input_param_when_output()
self._execute()
inputs = {
"pipeline_model": lr_classifier.get_outputs()['output_port_1'] #@input {"label":"pipeline_model","type":"PipelineModel"}
}
params = {
"inputs": inputs,
"output_model_path": "./output/UEFA/LR" #@param {"label":"output_model_path","type":"string","required":"true","helpTip":""}
}
save_model = MLSSavePipelineModel(**params)
save_model.run()
总结:
按照二分类问题作者得到的AUC=0.77,什么意思呢:按照作者思路把结果为胜的作为1,平负作为0,AUC=0.5时,说明模型没有分类能力,结果完全是随机预测。而在0.5~1之间时说明有一定的预测能力。
考虑到此方法无法确定哪些场次的预测结果是正确或错误,而往往智能算法存在过拟合(即强队低水方打出为预测结果),在实战中应该是难以有收获。