为 Mahout 增加聚类评估功能

聚类算法及聚类评估 Silhouette 简介

聚类算法简介

聚类(clustering)是属于无监督学习(Unsupervised learning)的一种,用来把一组数据划分为几类,每类中的数据尽可能的相似,而不同类之间尽可能的差异最大化。通过聚类,可以为样本选取提供参考,或进行根源分析,或作为其它算法的预处理步骤。

聚类算法中,最经典的要属于 Kmeans 算法,它的基本思想是:假设我们要把一组数据聚成 N 类,那就:

把数据中的每个样本作为一个向量,记作Ā

首先随机选取 n 个样本,把这 n 个样本作为 N 类的中心点, 称为 centroid

针对数据中的所有样本,计算到 n 个 centroid 的距离,距离哪个中心点最近,就属于哪一类

在每一类中,重新选取 centroid,假设该类有 k 个样本,则 centroid 为

重复 2,3 直到 centroid 的变化小于预设的值。

Mahout 是一个开源的机器学习软件,提供了应用推荐、聚类、分类、Logistic 回归分析等算法。特别是由于结合了 Hadoop 的大数据处理能力,每个算法都可以作为独立的 job 方便的部署在 Hadoop 平台上,因此得到了越来越广的应用。在聚类领域,Mahout 提供了 Kmeans,LDA, Canopy 等多种算法。

聚类评估算法 Silhouette 简介

在 Kmeans 中,我们会注意到需要我们预先设置聚合成几类。实际上,在聚类的过程中我们也不可能预先知道,那只能分成 2 类,3 类,……n 类这样进行尝试,并评估每次的聚类效果。

实际上,由于聚类的无监督学习特性,无论什么算法都需要评估效果。在聚类的评估中,有基于外部数据的评价,也有单纯的基于聚类本身的评价,其基本思想就是:在同一类中,各个数据点越近越好,并且和类外的数据点越远越好;前者称为内聚因子(cohension),后者称为离散因子(separation)。

把这两者结合起来,就形成了评价聚类效果的 Silhouette 因子:

首先看如何评价一个点的聚类效果:

a = 一个点到同一聚类内其它点的平均距离

b=min(一个点到其他聚类内的点的平均距离)

Silhouette 因子s = 1 – a/b (a<b) 或b/a -1 (a>=b)

衡量整体聚类的效果,则是所有点的 Silhouette 因子的平均值。范围应该在 (-1,1), 值越大则说明聚类效果越好。

图 1.Silhouette 中内聚、离散因子示意

以图 1 为例。图 1 显示的是一个具有 9 个点的聚类,三个圆形表示聚成了三类,其中的黄点表示质心(centroid)。为了评估图 1 中深蓝色点的聚类效果,其内聚因子a就是该点到所在圆中其它三个点的平均距离。离散因子b的计算相对复杂:我们需要先求出到该点到右上角圆中的三个点的平均距离,记为 b1;然后求出该点到右下角圆中两个点的平均距离,记为 b2;b1 和 b2 的较小值则为b。

在 IBM 的 SPSS Clementine 中,也有 Silhouett 评估算法的实现,不过 IBM 提供的是一个简化版本,把一个点到一个类内的距离的平均值,简化为到该类质心(centroid)的距离,具体来说,就是:

图 2.IBM 关于内聚、离散因子的简化实现

还是以上面描述的 9 个点聚成 3 类的例子来说明。IBM 的实现把a的实现简化为到深蓝色的点所在的质心的距离。计算b时候,还是要先计算 b1 和 b2,然后求最小值。但 b1 简化为到右上角圆质心的距离;b2 简化为到右下角圆质心的距离。

在下面的内容中,我们尝试利用 IBM 简化后的公式为 Mahout 增加聚类评估功能。

Mahout 聚类过程分析

Mahout 运行环境简介

前面说过,Mahout 是依赖 Hadoop 环境,每一个算法或辅助功能都是作为 Hadoop 的一个单独的 job 来运行,所以必须准备好一个可运行的 Hadoop 环境,(至少本文写作时候使用的 Mahout0.9 还在依赖 Hadoop),如何安装配置一个可运行的 Hadoop 环境不在这篇文章的介绍范围内。请自行参考 Hadoop 网站。需要说明的是,本文采用的 Hadoop 为 2.2.0。

安装完 Hadoop 后,下载 mahout-distribution-0.9,解压缩后的重要内容如下:

bin/: 目录下有 Mahout 可执行脚本

mahout-examples-0.9-job.jar,各种算法的实现类

example/ 各种实现算法的源码

conf/ 存放各实现类的配置文件,其中重要的为 driver.classes.default.props,如果增加实现算法类,可以在该文件中增加配置项,从而可以被 Mahout 启动脚本调用。

单独执行 Mahout,是一个实现的各种功能的简介,如下例:

执行 /data01/shanlei/src/mahout-distribution-0.9/bin/mahout

输出:

MAHOUT_LOCAL is not set; adding

HADOOP_CONF_DIR to classpath.

Running on hadoop, using /data01/shanlei/hadoop-2.2.0/bin/hadoop and

HADOOP_CONF_DIR=/data01/shanlei/hadoop-2.2.0/conf

p1 is org.apache.mahout.driver.MahoutDriver

MAHOUT-JOB:

/data01/shanlei/src/mahout-distribution-0.9/examples/target/mahout-examples-0.9-job.jar

An example program must be given as the first argument.

Valid program names are:

arff.vector: : Generate Vectors from an ARFF file or directory

assesser: : assesse cluster result using silhoueter algorithm

baumwelch: : Baum-Welch algorithm for unsupervised HMM training

canopy: : Canopy clustering

cat: : Print a file or resource as the logistic regression models would see it

cleansvd: : Cleanup and verification of SVD output

clusterdump: : Dump cluster output to text

clusterpp: : Groups Clustering Output In Clusters

cmdump: : Dump confusion matrix in HTML or text formats

concatmatrices: : Concatenates 2 matrices of same cardinality into a single matrix

……

如果要执行某种算法,如上面结果中显示的 canopy,就需要执行 mahout canopy 加上该算法需要的其它参数。

另外,Mahout 算法的输入输出,都是在 Hadoop HDFS 上,因此需要通过 hdfs 命令上传到 hdfs 文件系统;输出大多为 Mahout 特有的二进制格式,需要通过 mahout seqdumper 等命令来导出并转换为可读文本。

准备输入

Mahout 算法使用的 input 需要特定格式的 Vector 文件,不能够直接使用一般的文本文件,因此需要把文本转换为 Vector 文件,好在 Mahout 自身提供了这样的类:

org.apache.mahout.clustering.conversion.InputDriver。

在 Mahout 的 conf 目录中的 driver.classes.default.props 增加如下行:

org.apache.mahout.clustering.conversion.InputDriver = input2Seq : create sequence file from blank separated files,然后就可以为 Mahout 增加一个功能,把空格分隔的文本文件转换为 Mahout 聚类可以使用的向量。

如下面的数据所示,该数据每行为一个包含 6 个属性的向量:

1 4 3 11 4 3

2 2 5 2 10 3

1 1 2 2 10 1

1 4 2 11 5 4

1 1 3 2 10 1

2 4 5 9 5 2

2 6 5 3 8 1

执行 ./mahout input2Seq -i /shanlei/userEnum -o /shanlei/vectors 则产生聚类需要的向量文件。

聚类

以 Kmeans 聚类为例:

./mahout kmeans –input /shanlei/vectors –output /shanlei/kmeans -c /shanlei/k –maxIter 5 -k 8 –cl

-k 8 指明产生 8 类,执行完成后,在/shanlei/kmeans/下会产生: clusters-0,clusters-1,… …,clusters-n-final 目录,每个目录都是一次迭代产生的 centroids, 目录数会受 –maxIter 控制;最后的结果会加上 final。

利用 Mahout 的 clusterdump 功能我们可以查看聚类的结果:

./mahout clusterdump -i /shanlei/kmeans/clusters-2-final -o ./centroids.txt

more centroids.txt:

VL-869{n=49 c=[1.163, 5.082, 4.000,

4.000, 4.592, 2.429] r=[0.370, 0.965, 1.030, 1.245, 1.244, 1.161]}

VL-949{n=201 c=[1.229, 4.458, 4.403,

10.040, 6.134, 1.458] r=[0.420, 1.079, 0.836, 1.196, 1.392, 0.852]}

… …

VL-980{n=146 c=[1.281, 2.000, 4.178,

2.158, 9.911, 1.918] r=[0.449, 0.712, 1.203, 0.570, 0.437, 1.208]}

VL-869 中的 869 为该类的 id,c=[1.163, 5.082, 4.000, 4.000, 4.592, 2.429] 为 centroid 的坐标,n=49 表示该类中数据点的个数。

如果使用-cl 参数,则在/shanlei/kmeans/下会产生 clusteredPoints,利用 Mahout 的 seqdumper 可以看其内容:

Input Path:

hdfs://rac122:18020/shanlei/kmeans/clusteredPoints/part-m-00000

Key class: class

org.apache.hadoop.io.IntWritable Value Class: class

org.apache.mahout.clustering.classify.WeightedPropertyVectorWri

table

Key: 301: Value: wt: 1.0 distance:

6.6834472852629006 vec: 1 = [1.000, 4.000, 3.000, 11.000, 4.000, 3.000]

Key: 980: Value: wt: 1.0 distance:

2.3966504034528384 vec: 2 = [2.000, 2.000, 5.000, 2.000, 10.000, 3.000]

Key 对应的则是相关聚类的 id,distance 为到 centroid 的距离。vec 则是原始的向量。

从 Mahout 的聚类输出结果来看,能够很容易的实现 IBM 简化后的 Silhouette 算法,内聚因子 (a) 可以简单的获取到,而离散因子 (b) 也能够简单的计算实现。下面我们就来设计 Mahout 中的实现。

Mahout 中 Silhouette 实现

算法设计:

遵循 Hadoop 上 MR 程序的设计原则,算法设计考虑了 mapper,reducer 及 combiner 类。

Mapper 设计:

输入目录:聚类的最终结果目录 clusteredPoints(通过命令行参数-i 设置),

输入:

Key:IntWritable,Value:WeightedPropertyVectorWritable

输出:

Key:IntWritable(无意义,常量 1),Value:Text(单个点的 Silhouette 值,格式为“cnt,Silhouette 值”)

Setup 过程:

因为需要计算 separation 时候要访问其它的 centroids,所以在 setup 中读取(通过命令行参数-c 设置)并缓存。

Map 过程:

由于输入的 Value 为 WeightedPropertyVectorWritable,可以通过访问字段 distance获得参数 a,并遍历缓存的 centroids,针对其 id 不等于 Key 的,逐一计算距离,其最小的就是参数 b。

Map 的结果 Key 使用常量 1,Value 为形如“1,0.23”这样的“cnt,Silhouette 值”格式。

Reducer 设计:

输入:

Key:IntWritable(常量 1),Value: Text (combine 后的中间 Silhouette 值,格式为“cnt,Silhouette 值”)。

输出:

Key:IntWritable(常量 1),Value:整个聚类的 Silhouette 值,格式为“cnt,Silhouette 值”。

输出目录:最终文件的产生目录,通过命令行参数-o 设置。

Reduce 过程:

根据“,”把每个 Value,分解为 cnt,和 Silhouette,最后进行加权平均。

Combiner 设计:

为减少数据的 copy,采用 combiner,其实现即为 reducer 的实现。

实现代码:

Mapper 类:

public class AssesserMapper extends Mapper<IntWritable,

WeightedPropertyVectorWritable, IntWritable, Text> {

private List<Cluster> clusterModels;

private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);

protected void setup(Context context) throws IOException, InterruptedException {

super.setup(context);

Configuration conf = context.getConfiguration();

String clustersIn = conf.get(ClusterClassificationConfigKeys.CLUSTERS_IN);

clusterModels = Lists.newArrayList();

if (clustersIn != null && !clustersIn.isEmpty()) {

Path clustersInPath = new Path(clustersIn);

clusterModels = populateClusterModels(clustersInPath, conf);

}

}

private static List<Cluster> populateClusterModels(Path clustersIn,

Configuration conf) throws IOException {

List<Cluster> clusterModels = Lists.newArrayList();

Path finalClustersPath = finalClustersPath(conf, clustersIn);

Iterator<?> it = new SequenceFileDirValueIterator<Writable>(finalClustersPath, PathType.LIST,

PathFilters.partFilter(), null, false, conf);

while (it.hasNext()) {

ClusterWritable next = (ClusterWritable) it.next();

Cluster cluster = next.getValue();

cluster.configure(conf);

clusterModels.add(cluster);

}

return clusterModels;

}

private static Path finalClustersPath(Configuration conf,

Path clusterOutputPath) throws IOException {

FileSystem fileSystem = clusterOutputPath.getFileSystem(conf);

FileStatus[] clusterFiles = fileSystem.listStatus(clusterOutputPath,

PathFilters.finalPartFilter());

log.info(“files: {}”, clusterOutputPath.toString());

return clusterFiles[0].getPath();

}

protected void map(IntWritable key, WeightedPropertyVectorWritable vw, Context context)

throws IOException, InterruptedException {

int clusterId=key.get();

double cohension,separation=-1,silhouete;

Map<Text,Text> props=vw.getProperties();

cohension=Float.valueOf(props.get(new Text(“distance”)).toString());

Vector vector = vw.getVector();

for ( Cluster centroid : clusterModels) {

if (centroid.getId()!=clusterId) {

DistanceMeasureCluster distanceMeasureCluster = (DistanceMeasureCluster) centroid;

DistanceMeasure distanceMeasure = distanceMeasureCluster.getMeasure();

double f = distanceMeasure.distance(centroid.getCenter(), vector);

if (f<separation || separation<-0.5) separation=f;

}

}

Text value=new Text(Long.toString(1)+”,”+Double.toString(silhouete));

IntWritable okey=new IntWritable();

okey.set(1);

context.write(okey, value);

}

}

Reducer 类:

public class AssesserReducer extends Reducer<IntWritable, Text, IntWritable, Text> {

private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);

protected void setup(Context context) throws IOException, InterruptedException {

super.setup(context);

log.info(“reducer”);

}

private static final Pattern SEPARATOR = Pattern.compile(“[t,]”);

public void reduce(IntWritable key, Iterable<Text> values,

Context context)

throws IOException, InterruptedException {

long cnt=0;

double total=0;

for (Text value : values) {

String[] p=SEPARATOR.split(value.toString());

Long itemCnt=Long.parseLong(p[0]);

double v=Double.parseDouble(p[1]);

total=total+ itemCnt*v;

cnt=cnt+itemCnt;

}

}

}

Job 类:

public class ClusterAssesser extends AbstractJob {

private ClusterAssesser() {

}

public int run(String[] args) throws Exception {

addInputOption();

addOutputOption();

//addOption(DefaultOptionCreator.methodOption().create());

addOption(DefaultOptionCreator.clustersInOption()

.withDescription(“The input centroids”).create());

if (parseArguments(args) == null) {

return -1;

}

Path input = getInputPath();

Path output = getOutputPath();

Path clustersIn = new Path(getOption(DefaultOptionCreator.CLUSTERS_IN_OPTION));

if (getConf() == null) {

setConf(new Configuration());

}

run(getConf(), input, clustersIn, output);

return 0;

}

private void run(Configuration conf, Path input, Path clustersIn,

Path output)throws IOException, InterruptedException,

ClassNotFoundException {

conf.set(ClusterClassificationConfigKeys.CLUSTERS_IN, clustersIn.toUri().toString());

Job job = new Job(conf, “Cluster Assesser using silhouete over input: ” + input);

job.setJarByClass(ClusterAssesser.class);

job.setInputFormatClass(SequenceFileInputFormat.class);

job.setOutputFormatClass(SequenceFileOutputFormat.class);

job.setMapperClass(AssesserMapper.class);

job.setCombinerClass(AssesserReducer.class);

job.setReducerClass(AssesserReducer.class);

job.setNumReduceTasks(1);

job.setOutputKeyClass(IntWritable.class);

job.setOutputValueClass(Text.class);

FileInputFormat.addInputPath(job, input);

FileOutputFormat.setOutputPath(job, output);

if (!job.waitForCompletion(true)) {

throw new InterruptedException(“Cluster Assesser Job failed processing ” + input);

}

}

private static final Logger log = LoggerFactory.getLogger(ClusterAssesser.class);

public static void main(String[] args) throws Exception {

ToolRunner.run(new Configuration(), new ClusterAssesser(), args);

}

}

编译运行:

编译环境准备:

在从 Mahout 网站下载的包中,同时包含了源码以及可以导入到 eclipse 的工程,导入后,会产生 mahout-core,mahout-distribution,mahout-example 等不同的 projects,我们首先编译一遍,保证没有错误,然后再考虑如何增加自己的代码。

当然,Mahout 在顶层目录也提供了一个编译脚本:compile.sh, 可以在命令行完成编译。

代码编译:

把自己的代码放到 example/src/main/java/目录下,自动编译就可以了。输出产生的类:com.ai.cluster.assesser.ClusterAssesser,然后就被打包到了 examples/target/mahout-examples-0.9-job.jar 中。

配置:

把 examples/target/mahout-examples-0.9-job.jar 覆盖顶层的 mahout-examples-0.9-job.jar

通过在 conf/driver.classes.default.props 文件添加如下行,把我们的实现类加入到 Mahout 的配置中,从而可以通过 Mahout 脚本执行:

com.ai.cluster.assesser.ClusterAssesser = assesser : assesse cluster result using silhoueter algorithm

运行

利用前面我们做聚类过程分析产生的聚类结果:

bin/mahout assesser -i /shanlei/kmeans/clusteredPoints -o /shanlei/silhouete -c

/shanlei/kmeans –tempDir /shanlei/temp

其中的-c 为输入聚类的中心点,-i 为聚类的点 –o 为最终的输出。

查看结果:

bin/mahout seqdumper -i /shanlei/silhouete -o ./a.txt

more a.txt:

Input Path: hdfs://rac122:18020/shanlei/silhouete/part-r-00000

Key class: class org.apache.hadoop.io.IntWritable Value Class: class org.apache.hadoop.io.Text

Key: 1: Value: 1000,0.5217678842906524

Count: 1

1000 表示共 1000 个点,0.52176 为聚类的 Silhouette 值。大于 0.5,看起来效果还行。

结束语:

不同于其它的套件,Mahout 从发布起就是为处理海量数据、为生产而准备的。直到现在,Mahout 的重心还是在优化各种算法上面,对易用性考虑不多,而且学习成本也很高。但 Mahout 不仅仅提供某些特定的算法,而且还把前期准备中的数据清洗,转换,以及后续的效果评估、图形化展现都集成在一块,方便用户。这不仅是一种发展趋势,也是争取用户的一个关键因素。希望大家都能够加入进来,提供各种各样的辅助功能,让 Mahout 变得易用起来。

来源URL:http://www.ynpxrz.com/n1003881c2029.aspx