在使用Anomaly detection算法时,如果出现 Non existing by clause 的错误提示,可能是由于输入的数据中缺少一些列或者指定的列名有误。可以使用以下代码示例来解决这个问题:
# 导入相关库
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.clustering import KMeans
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
# 准备数据
data = spark.read.format("csv").load("data.csv", inferSchema=True, header=True)
cols = [c for c in data.columns if c not in {"user_id", "label"}]
assembler = VectorAssembler(inputCols=cols, outputCol="features")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False)
# 使用管道将数据处理过程组合起来
pipeline = Pipeline(stages=[assembler, scaler])
pipelineModel = pipeline.fit(data)
dataset = pipelineModel.transform(data)
# 进行聚类
kmeans = KMeans(k=k, seed=1)
model = kmeans.fit(dataset)
predictions = model.transform(dataset)
其中,data.csv是输入的数据文件,包括需要进行聚类的列和相应的标签列,k是聚类的数量。在数据准备阶段,可以根据需要进行列的选择和处理,最后使用PipeLine进行组合。在聚类阶段,只需指定k的数量,并使用fit和transform方法即可完成聚类。