std::inner_product的简单性能测试

最近团队产品中用到了一些机器学习方面的算法,涉及到求向量内积,采取的是最朴素的实现方式(元素乘积循环相加)。有一天路上想到 STL 提供了一个模板函数 std::inner_product ,就好奇 libstdc++ 实现上是否对该算法做了什么优化呢?

于是做了个简单的实验:1000 维 double 类型向量乘积,用 std::inner_product 和朴素方法分别计算10000次,g++ -O2优化。第一轮使用原生 double 类型数组,第二轮使用 vector<double> 容器,分别在三个机器环境下进行了计算。

// Processors | physical = 2, cores = 32, virtual = 12, hyperthreading = no
//     Speeds | 12x2400.186
//     Models | 12xIntel(R) Xeon(R) CPU E5645 @ 2.40GHz
//     Caches | 12x256 KB
//        GCC | version 3.4.5 20051201 (Red Hat 3.4.5-2)
	   
a*b     : std::inner_product(27.934ms), for loop(40.061ms)
a_v*b_v : std::inner_product(27.878ms), for loop(40.04ms)

// Processors | physical = 2, cores = 12, virtual = 12, hyperthreading = no
//     Speeds | 12x2100.173
//     Models | 12xAMD Opteron(tm) Processor 4170 HE
//     Caches | 12x512 KB
//        GCC | version 3.4.5 20051201 (Red Hat 3.4.5-2)

a*b     : std::inner_product(31.242ms), for loop(47.853ms)
a_v*b_v : std::inner_product(31.301ms), for loop(47.815ms)

// Processors | physical = 1, cores = 0, virtual = 1, hyperthreading = no
//     Speeds | 1x2572.652
//     Models | 1xIntel(R) Core(TM) i5-3320M CPU @ 2.60GHz
//     Caches | 1x6144 KB
//        GCC | version 4.7.2 (Ubuntu/Linaro 4.7.2-2ubuntu1)

a*b     : std::inner_product(41.76ms), for loop(33.165ms)
a_v*b_v : std::inner_product(35.913ms), for loop(32.881ms)

可以看出不同环境下 std::inner_product 的表现不尽相同,与朴素的方式相比有优有劣。瞄了一眼 gcc 4.8 的 libstdc++ 的代码,没有注意到 std::inner_product 对基本类型做什么 SSE 指令的优化。不过倒是有个并行计算的版本,可能对超大的向量计算有帮助。

虽然从性能上没有看到明显的优势,但毕竟 std::inner_product 可以简化一个循环的编码,至少可以少测一个分支嘛。而且配合重载函数的后两个 functor 参数,可以做一些有趣的事情,比如算一组数的平方和,比较两个字符串相同字符的数量等。以后呢可以多尝试一下用标准库的算法而不是自己写循环。

寻找最快的Python字符串插入方式

在 MapReduce 分布式计算时有这样一种场景:mapper 输入来自多个不同的数据源,共同点是每行记录第一列是作为 key 的 id 列,reducer 需要根据数据源的不同,进行相应的处理。由于数据到 reducer 阶段已经无法区分来自什么文件,所以一般采取的方法是 mapper 为数据记录打一个 TAG。为了便于使用,我习惯于把这个 TAG 打到数据的第二列(第一列为 id 列,作为 reduce/join 的 key),所以有这样的 mapper 函数:

def mapper1(line):
    l = line.split('\t', 1)
    return "%s\t%s\t%s" % (l[0], 'TAG', l[1])

这样给定输入:

s = "3001	VALUE"

mapper1(s) 的结果就是:

s = "3001	TAG	VALUE"

这是一个潜意识就想到的很直白的函数,但是我今天忽然脑子转筋,陷入了“这是最快的吗”思维怪圈里。于是我就想,还有什么其它方法呢?哦,格式化的表达式可以用 string 的 + 运算来表示:

def mapper2(line):
    l = line.split('\t', 1)
    return l[0] + '\t' + 'TAG' + '\t' + l[1]

上面是故意将 '\t' 分开写,因为一般 TAG 是以变量方式传入的。还有,都说 join 比 + 快,那么也可以这样:

def mapper3(line):
    l = line.split('\t', 1)
    l.insert(1, 'TAG')
    return '\t'.join(l)

split 可能要消耗额外的空间,那就换 find:

def mapper4(line):
    pos = line.find('\t')
    return "%s\t%s\t%s" % (line[0:pos], 'TAG', line[pos+1:])

变态一点儿,第一个数是整数嘛,换成整型输出:

def mapper5(line):
    pos = line.find('\t')
    pid = long(line[0:pos])
    return "%d\t%s\t%s" % (pid, 'TAG', line[pos+1:])

再换个思路,split 可以换成 partition:

def mapper6(line):
    (h,s,t) = line.partition('\t')
    return "%s\t%s\t%s" % (h, 'TAG', t)

或者干脆 ticky 一点儿,用 replace 替换第一个找到的制表符:

def mapper7(line):
    return line.replace('\t', '\t'+'TAG'+'\t', 1)

哇,看一下,原来可选的方法还真不少,而且我相信这肯定没有列举到所有的方法。看到这里,就这几个有限的算法,你猜一下哪个最快?最快的比最慢的快多少?

先把计时方法贴一下:

for i in range(1,8):
    f = 'mapper%d(s)' % i
    su = "from __main__ import mapper%d,s" % i
    print f, ':', timeit.Timer(f, setup=su).timeit()

下面是答案:

mapper1(s) : 1.32489800453
mapper2(s) : 1.2933549881
mapper3(s) : 1.65229916573
mapper4(s) : 1.22059297562
mapper5(s) : 2.60358095169
mapper6(s) : 0.956777095795
mapper7(s) : 0.726199865341

最后胜出的是 mapper7 (tricky 的 replace 方法),最慢的是 mapper5 (蛋疼的 id 转数字方法),最慢的耗时是最慢的约 3.6 倍。最早想到的 mapper1 方法在 7 种方法中排名——第 5!耗时是最快方法的 1.8 倍。考虑到 mapper 足够简单,这个将近一倍的开销还是有一点点意义的。

最后,欢迎回复给出更快的方法!

那些害人的编码“神谕”

同其它领域一样,计算机科学和工程领域也是群星璀璨,有些耀眼的星光甚至刺得我们无法直视,只能匍匐在地上聆听神谕。也正如其它领域一样,虽然大家听到的是同样的话,却有各式各样不同的理解。我这里想讲的,就是我观察到的不同理解引发的现象。

“过早优化是万恶之源。” 这是 Donald Knuth 的一句名言。虽然大部分人都不知道,或者会忘掉前面半句:“We should forget about small efficiencies, say about 97% of the time: premature optimization is the root of all evil.” Knuth 说出这句话时,可能想不到这句话会多么地流行,多么根植在很多人心中,以至于成为程序员偷懒的借口,阻碍进步的动力。因为有了这句话,在你指出别人代码中可以优化的问题时,还必须浪费口舌来解释这样的优化是必要的,不是过早优化或者过度优化。

就我的观察而言,对很多程序员来说,其能力还远远达不到过早优化的地步。但他感觉自己受到了 Knuth 的神启,仿佛具有了某种魔力,不优化代码反而成了一种优越感!关于大多数人是否具备过早优化代码的能力,我可以举几个至今我还觉得神奇的例子。

我供职的公司内部有这样一个模块,隔一两个星期总会挂掉几台服务器,现象是内存占满导致服务器假死或者宕机,但事实上根据请求推算根本不会同时使用那么多内存。最后的排查结果发现,每个线程都有这样一个数据结构,它的内存是只增不减的。当你调用它的 clear 接口,它只会把所有的内存还回自己的内存池里,而不是还给系统。这就导致可供分配的内存越来越少、越来越少...

还是这个模块里,仅仅加载一个几 K 的配置文件,就能够占用超过 1G 的内存。为什么呢?因为它用 char str[MAX_CONF_LEN] 保存配置字符串,用 struct xx_t xx[MAX_XX_NUM] 读取配置,而且这个 struct 中还有嵌套的 struct yy_t y[MAX_YY_NUM] 数组。

该模块是个个例吗?还是这家公司,一个全公司使用的公共日志库,LOGGING 宏定义中直接传一个需要系统调用的函数作为参数,导致无论关不关该级别日志都要进行一次系统调用

这家公司好歹也位列国内顶尖的互联网公司之一,工程师的招聘要求也是极其高的,还会普遍出现这种肆意浪费资源的情况。那么我想对于大部分工程师来说,谈避免“过早优化”、“过度优化”,还为时尚早。

还有一句名言“好代码本身就是最好的文档。当你需要添加一个注释时,你应该考虑如何修改代码才能不需要注释。” 这是 Steve McConnell 说的。同样,大部分人都不知道,或者忘掉后面半句:Good code is its own best documentation. As you're about to add a comment, ask yourself, "How can I improve the code so that this comment isn't needed?" Improve the code and then document it to make it even clearer. 如果你是程序员,回想一下多少次跟别人讨论代码是不是必须要注释时,这句话被引用到;有很多次在写代码时喜爱这句话,又多少次改别人的代码时痛恨这句话。

还是从我个人的观察来看,对很多程序员来说,其编码能力还不足以达到“代码本身就是最好的文档”的地步,包括我自己。敝司招聘过很多顶尖的工程师,有传说中的各种杰出前辈,可能在各种学校、公司内部事迹广为流传。但若是你哪天继承了他的代码遗产,就会发现很多传说中的明星跌落凡尘。成百上千行没有注释,使用一个公共库函数时要么接口就根本没注释只能基本靠猜,要么即使注释也语焉不详让你踩到未注明的大坑。每到这个时候你心里总会暗暗骂娘,后面别人再谈到他的光辉事迹时,你跟随讪笑时心中暗自腹诽:“牛逼个锤子!”

但我想很多人争论的焦点是:“注释是不是不可省略的、要强制执行的?”即使个别人能力真能达到“代码本身就是最好的文档”的地步(我还没见过),我也不建议在团队中传播“注释可以省略”这一想法。因为如果你说“注释可以省略”,可能你会发现大家都理解和实践成“终于可以不写注释了”。如果一个刚刚大学毕业、脑袋里从来没有过 documentation 概念、从来没写过注释的新人进入公司,就“终于可以不写注释了”,那么我想他的代码会很难达到“代码本身就是最好的文档”这个级别。因为他根本没有机会懂得什么叫做 documentation。

在公司里,代码注释深远地影响着团队合作的每个人,以及软件生存期里所有的维护者,甚至会影响自己的职业声誉。所以无论别人怎么想,我对注释这个问题的答案始终是:“注释是不可省略的,越完善越好的,甚至强制执行矫枉过正也没关系的!”

Python操作Excel

老婆单位有时候有一些很大的 Excel 统计报表需要处理,其中最恶心的是跨表的 JOIN 查询。他们通常采取的做法是,把多个 Excel 工作簿合成一个工作簿的多个表格,然后再跑函数(VLOOKUP之类)去查。因为用的函数效率很低,在 CPU 打满的情况下还要跑几个小时。

然后我就看不过去了,我也不懂 Excel,不知道如何优化,但我想用 Python+SQLite 总归是能够实现的。于是就尝试了一把,效果还不错,一分钟以内完成统计很轻松,其中大部分时间主要花在读 Excel 内容上。

1. Python 操作 Excel 的函数库

我主要尝试了 3 种读写 Excel 的方法:

1> xlrd, xlwt, xlutils: 这三个库的好处是不需要其它支持,在任何操作系统上都可以使用。xlrd 可以读取 .xls, .xlsx 文件,非常好用;但因为 xlwt 不能直接修改 Excel 文档,必须得复制一份然后另存为其它文件,而且据说写复杂格式的 Excel 文件会出现问题,所以我没有选它来写 Excel 文件。

2> openpyxl: 这个库也是不需要其它支持的,而且据说对 Office 2007 格式支持得更好。遗憾地是,我经过测试,发现它加载 Excel 文件的效率比 xlrd 慢 3 倍以上,内存使用在 10 倍以上,于是就放弃了。

3> win32com: Python Win32 扩展,这个库需要运行环境为 Windows+Office 对应版本。由于 Python Win32 扩展只是把 COM 接口包装了一下,可以视为与 VBA 完全相同,不会有读写格式上的问题。尝试了一下用 win32com 读取 Excel 文件,效率还是比 xlrd 慢一些。

由于读取效率上 xlrd > win32com > openpyxl,所以我自然选择了 xlrd 用来读取统计报表;而最终输出的报表格式较复杂,所以选择了 win32com 直接操作 Excel 文件。

2. Python 里的关系型数据库

SQLite 是一个非常轻量级的关系型数据库,很多语言和平台都内置 SQLite 支持,也是 iOS 和 Android 上的默认数据库。Python 的标准库里也包含了 sqlite3 库,用起来非常方便。

3. 用 xlrd 读取 Excel 并插入数据库样例

如果数据量不大,直接用 Python 内部数据结构如 dict, list 就够了。但如果读取的几张表数据量都较大,增加个将数据插入数据库的预处理过程就有很大好处。一是避免每次调试都要进行耗时较长的 Excel 文件载入过程;二是能充分利用数据库的索引和 SQL 语句强大功能进行快速数据分析。

#!/usr/bin/python
# -*- coding: gbk -*-

import xlrd
import sqlite3

# 打开数据库文件
device_city_db = sqlite3.connect('device_city.db')
cursor = device_city_db.cursor()

# 建表
cursor.execute('DROP TABLE IF EXISTS device_city')
cursor.execute('CREATE TABLE device_city (device_id char(16) PRIMARY KEY, city varchar(16))')
 
# 打开 device 相关输入 Excel 文件
device_workbook = xlrd.open_workbook('输入.xlsx')
device_sheet = device_workbook.sheet_by_name('设备表')

# 逐行读取 device-城市 映射文件,并将指定的列插入数据库
for row in range(1, device_sheet.nrows):
   device_id = device_sheet.cell(row, 6).value
   if len(device_id) > 16:
       device_id = device_id[0:16]
   if len(device_id) == 0:
       continue
   city = device_sheet.cell(row, 10).value
   # 避免插入重复记录
   cursor.execute('SELECT * FROM device_city WHERE device_id=?', (device_id,))
   res = cursor.fetchone()
   if res == None:
       cursor.execute('INSERT INTO device_city (device_id, city) VALUES (?, ?)',
                      (device_id, city))
   else:
       if res[1] != city:
           print '%s, %s, %s, %s' % (device_id, city, res[0], res[1])
device_city_db.commit()

4. 将结果写入 Excel 文件样例

使用 win32com 写入 Excel 的时候要注意,一定要记得退出 Excel,否则下次运行会出错。这需要增加异常处理语句,我这里偷了个懒,出了异常后要手动杀死任务管理器中的 excel 进程。至于 win32com 中类的接口,可以从 MSDN 网站查阅。

import win32com.client as win32
import os
excel = win32.gencache.EnsureDispatch('Excel.Application')
excel.Visible = False
# 貌似这里只能接受全路径
workbook = excel.Workbooks.Open(os.path.join(os.getcwd(), '输出.xlsx'))
month_sheet = workbook.Worksheets(1)
# 计算文件中实际有内容的行数
nrows = month_sheet.Range('A65536').End(win32.constants.xlUp).Row
# 操作 Excel 单元格的值
for row in range(5, nrows-4):
   month_sheet.Cells(row, 1).Value += something
# 保存工作簿
workbook.Save()
# 退出 Excel
excel.Application.Quit()

Python JSON模块解码中文的BUG

很多语言或协议选择使用 ASCII 字符 “\”(backslash,0x5c) 作为字符串的转义符,包括 JSON 中的字符串。一般来说,使用 Python 中的 JSON 模块编码英文,不会存在转义符的问题。但如果使用 JSON 模块编解码中文,就可能面临着中文字符包含转义符带来的 bug。本篇文章给出了一个 badcase。

中文解码错误

测试用例文件里面包含繁体的“運動”二字,使用 GB18030 编码。使用 json 解码的错误如下:

$ cat decode.dat
{"a":"運動"}
$ python
>>> import json
>>> fp=open('decode.dat', 'r')
>>> json.load(fp, encoding='gb18030')
Traceback (most recent call last):
  File "", line 1, in 
  File "/home/yangwb/local/lib/python2.7/json/__init__.py", line 278, in load
    **kw)
  File "/home/yangwb/local/lib/python2.7/json/__init__.py", line 339, in loads
    return cls(encoding=encoding, **kw).decode(s)
  File "/home/yangwb/local/lib/python2.7/json/decoder.py", line 360, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/home/yangwb/local/lib/python2.7/json/decoder.py", line 376, in raw_decode
    obj, end = self.scan_once(s, idx)
UnicodeDecodeError: 'gb18030' codec can't decode byte 0xdf in position 0: incomplete
multibyte sequence

发生这个问题的原因,就存在于“運”字的编码之中。“運”的 GB18030 编码是 0xdf5c,由于第二个字符与转义符 “\” 编码相同,所以剩下的这个 0xdf 就被认为是一个 incomplete multibyte sequence。

我本来认为,既然已经提供了编码,json 模块就能够区分汉字与转义符(所以我觉得这应该是 json 的一个 bug)。但从实验来看,并非如此。对于一些不需提供字符编码的 JSON 解码器来说,我们倒可以用一种比较 tricky 的方法绕过上面这个问题,即在“運”字后面加一个额外的转义符:

{"a":"運\動"}

遗憾的是,这种方法对 Python 的 json 模块不适用。我仍不知道该如何解决这个解码问题。

中文编码——没错误!

对于相同的 case,Python 倒是能够编码成功:

$ cat in.dat
運動
$ python
>>> import json
>>> in_str = open('in.dat', 'r').read()
>>> out_f = open('out.dat', 'w', 0)
>>> dump_str = json.dumps({'a': in_str}, ensure_ascii=False, encoding='gb18030')
>>> out_f.write(dump_str.encode('gb18030'))
$ cat out.dat
{"a": "運動"}

所以这件事情就把我给搞糊涂了,Python 的 json 模块不能解码自己编码的 json 串。所以我觉得这可能是一个 bug,或者至少是 2.7.1 版本的 bug。

PS: 要仔细看文档

20120516:经网友 TreapDB 提醒,加载字符串时自己做 Unicode 转换,貌似能够解决这个问题。

$ cat decode.dat
{"a":"運動"}
$ python
>>> import json
>>> in_str = open('decode.dat', 'r').read().decode('gb18030')
>>> json.loads(in_str)

回头仔细看了一下 json 的文档,其中有这么一段:

Encodings that are not ASCII based (such as UCS-2) are not allowed, and should be wrapped with codecs.getreader(encoding)(fp), or simply decoded to a unicode object and passed to loads().

已经注明了 encoding 不支持非 ASCII-based 编码的参数,所以应该使用 getreader 进行转码,而不是让 json 模块去转码。看来是我没读懂文档,大惊小怪了,回家面壁去!

>>> json.load(codecs.getreader('gb18030')(fp))

警惕程序日志对性能的影响

做后台系统比做客户端软件的辛苦的地方,就是不能让程序轻易地挂掉。因为在生产环境中无法容易地复现或调试 bug,很多时候需要程序日志提供足够的信息,所以一个后台系统的程序员必须要明白该如何打日志(logging)。

很多语言都有自己现成的 logging 库,比如 Python 标准库中的 logging 模块,Apache 的 log4cxx(C++), log4j(Java)。如果你愿意找,很容易能找到基本满足自己需求的日志程序库。当然,自己实现一个也不是很困难。难点不在于写这些库,而是如何去使用它们。

大部分情况下,我们关注的都是日志的级别和内容。即哪些情况下,该打哪个级别的日志,日志语句中,该怎么写。

在程序开发的过程中,我们需要很多的日志协助分析程序问题;但在生产环境中,我们没有那么多的空间存储丰富的日志,而且日志量太大对于问题排查反而是累赘。有些人使用预处理解决这个问题,在 debug 版本和 release 版本中编译进不同的日志语句。这样能够解决一些问题,但却使得在生产环境中无法轻易地打印更多的日志。大部分人更接受的做法是,使用配置(参数)控制日志的打印级别,在需要更多日志的时候,可以随时打开它们。为了实现日志“少但是足够”的目标,开发人员必须明白日志信息的价值,即哪些日志应该属于哪个级别。

日志的作用是提供信息,但不同的日志语句,提供的信息量却是不一样的。有的日志里会写“Failed to get sth..”,但却忘记加上失败调用的返回值。同程序一样,日志语句中有的是变量(某个变量内容),有的是常量(提示信息)。常量你总能从程序源代码中获得,但变量不行。所以在一条日志中,信息量最大的是变量,是函数返回值/字符串内容/错误码,因而变量应该尽量放在靠前的位置。常量也不是一点价值没有,写得好的提示语句,会使问题一目了然,可以免去你到代码中 grep,然后重读代码的麻烦。

上面这两点,几乎所有知道 logging 重要性的同学都会了解。但关于 logging 对性能的影响,很多人没有足够的警惕心。例如有人会在一个按行解析文件的函数中写下这样的日志:

int parseline(...)
{
log_trace("Enter parseline with ...");
DO_SOMETHING;
log_trace("Exit parseline with ...");
return 0;
}

乍一看,由于 log_trace 级别不高,在生产环境中肯定会关闭,那么这样做看起来对性能没太大影响。但实际上 log_trace 可能是这样实现的:

#define log_trace(fmt, arg...) \
    xx_log(LVL_TRACE, "[%s:%d][time:%uus]" fmt, __FILE__, __LINE__,\
           log_getussecond(), ## arg)
#endif

可以看到 log_trace 宏中自动添加了很多信息,值得注意的是时间参数 log_getussecond()。大家都知道统计时间需要系统调用,那么无论 log_getussecond() 函数是如何实现的,它的代价肯定是高于一般的简单函数。

我们本以为 log_trace 在 LVL_TRACE 级别被关闭的情况下,消耗的代价仅仅是一个函数调用和分支判断,却没有发现宏参数中还隐藏着一个需要调用系统调用的函数。当文件不大是还算能够忍受,但当这个文件是一个数据库,扫描每一行都要执行两次 log_trace() 时,它对系统性能的影响就绝不可忽视了。

所以,最佳的做法还是,在性能攸关的代码中,使用可被预处理掉的 logging 语句,仅仅在 debug 发布中才能见到这些日志,release 版本中不把它们编译进来。

此外,上面这个 log_trace,是一个糟糕的设计。logging 模块只应该干 logging 的事情,开发人员需要时间统计时会自己完成。

修改exvim目录过滤逻辑为匹配拒绝

exVim 是一个非常优秀的 Vim 环境,通过它能够省去很多 Vim 插件的配置工作。自从使用上 exVim 后,我基本没有再自定义 Vim 插件,完全依赖 exVim 打包的辅助功能。

最近让我略有不爽的使用问题是:exVim 默认的 file filter 和 dir filter 都是匹配通过的,即“匹配 filter 过滤条件的目录和文件被通过,列入项目目录、文件列表中”。

exVim 的 dir filter

对于文件来说,设置匹配通过毫无问题。因为我也想要项目中仅包含 “.cpp,.c,.h,.py” 这样的源代码文件,选出来匹配这些模式的文件就是我希望的结果。

但是对于目录来说,设置匹配通过就与我通常的需求相悖了。一般情况下,项目目录下的所有目录都是程序需要的。但是一些专门存放测试程序、测试框架、输出文件的目录,我其实不希望显示在我的项目中。而且 exVim 中的目录过滤貌似仅限在项目顶层目录中,过滤的意义不大。

所以我修改了一下 exVim 的代码,将默认的 dir filter 含义修改为匹配拒绝,即:“匹配 dir filter 的目录被拒绝(被过滤掉),无论它在哪一级。"例如,我将 dir filter 设置为 “test,output”,那么我项目目录下所有叫做 test 或者 output 的子目录都不会显示到项目目录列表中,而不妨碍其它名称目录的通过。

可以想见两个 filter 采用不同的通过逻辑并不是 exVim 开发者希望看到的,所以我想这个修改也没必要提交给开发者。不过我仍然觉得这是很有用的一个修改,所以拿出来分享一下。修改的补丁文件见:http://share.solrex.org/ibuild/exvim-dir_filter-8.05_b2.patch

PS: patch 文件中还有一个改动是将 quick_gen_project_PROJECT_autogen.sh 文件从项目目录下,移动到项目目录下的 .vimfiles.PROJECT/ 目录中,原因是看起来碍眼 :)

std::sort 的仿函数参数

因为习惯了 qsort 的函数指针参数,以前用 std::sort 的时候一般也是传函数指针而不是仿函数(functor)。从很多示例程序来看,貌似没有什么大的不同。但是直到今天我才醒悟,原来是示例太简单了啊!

具体来说,我今天遇到了一个问题:要对一个表进行排序,每个字段可能是升序,可能是降序,也有不同的类型,所以排序的时候需要根据这些信息进行比较。比较函数不能是类成员函数,但我又的确要用到类成员的信息,函数接口又不能变,着实发愁。愁了就只能 Google,发现原来仿函数可以轻松地搞定这件事情。

// 来自 http://stackoverflow.com/a/1902360
class MyClass{

   // ...
   struct doCompare
   {
       doCompare( const MyClass& info ) : m_info(info) { } // only if you really need the object state
       const MyClass& m_info;

       bool operator()( const int & i1, const int & i2  )
       {
            // comparison code using m_info
       }
   };

    doSort()
    { std::sort( arr, arr+someSize, doCompare(*this) ); }
};

简单点儿说,因为仿函数是个类,也可以有成员变量,构造的时候可以传参进去初始化,这样就能实现更灵活的比较方法。这么简单的道理,为什么之前我就是想不到呢?

此外值得一提的是,std::sort 要求比较的结果是 strict weak order,就是说要严格小于才返回 true。这就意味着,仅仅对比较结果取反,是无法实现逆序的。因为小于的取反不是大于,而是大于等于。

我们有过经验,如果相等的时候也返回 true,可能会导致某些标准库实现的 sort 函数指针越界,导致程序出错。所以要千万避免犯这个错误。

Leveldb 编译错误背后的C++标准变化

在编译 Levedb 时,我遇到了这个错误:

g++ -c -I. -I./include -fno-builtin-memcmp -DLEVELDB_PLATFORM_POSIX -pthread -DOS_LINUX -O2 -DNDEBUG db/version_set.cc -o db/version_set.o
db/version_set.cc: In member function `void leveldb::VersionSet::Builder::Apply(leveldb::VersionEdit*)':
./db/version_edit.h:100: error: `std::vector, std::allocator > > leveldb::VersionEdit::compact_pointers_' is private
db/version_set.cc:461: error: within this context
...

在网上容易搜到解决方案,由于归根结底是访问控制问题,方法是把所有涉及到的的 private 变量或类型修改为 public。由于不是所有的编译器都会报错,我就很好奇产生这个错误的根本原因。

BTW: 一种不修改代码的 work around 方法是,在编译这个文件时加上 -fno-access-control 参数,这样 g++ 就不会进行访问控制检查,自然也就没问题了。这个参数同样可以用于对 private 成员函数进行单元测试。

简单地分析一下这个错误。发生错误的地方是在 VersionSet::Builder 这个类的成员函数中,而错误则是其成员函数无法访问 VersionEdit 和 Version 类的私有成员变量。VersionSet 是 VersionEdit 和 Version 类的友元类,Builder 是 VersionSet 的嵌套类。简化一下,代码如下所示:

class VersionSet;

class VersionEdit {
    friend class VersionSet;
    static int compact_pointers_;
};

class VersionSet {
    class Builder {
        int foo()
        {  
            return VersionEdit::compact_pointers_;
        }  
    }; 
};

把这段代码拿给编译器去编译,g++ 3.4.4/5 会报类似的 `int VersionEdit::compact_pointers_' is private 错误,但是 g++ 4.5.3 则能够编译通过。

由于 VersionSet 是 VersionEdit 的友元类,那么 VersionSet 是能够访问 VersionEdit 私有成员的,这样问题就集中在 Builder 是否能够获得与 VersionEdit 的友元关系。如果语法规定嵌套类 Builder 能够从 VersionSet “获得”友元关系,那么 Builder就能够访问 VersionEdit::compact_pointers_,反之就不能访问。

在 C++98 标准中,关于嵌套类的权限有如下描述:

$11.8/1 [class.access.nest],

The members of a nested class have no special access to members of an enclosing class, nor to classes or functions that have granted friendship to an enclosing class; the usual access rules (clause 11) shall be obeyed. The members of an enclosing class have no special access to members of a nested class; the usual access rules (clause 11) shall be obeyed.

Example:

class E {
    int x;
    class B { };
    class I {
        B b;                 // error: E::B is private
        int y;
        void f(E* p, int i) {
           p->x = i;         // error: E::x is private
        }
   };
   int g(I* p)
   {
       return p->y;          // error: I::y is private
   }
};

但是在 C++11 中,这段描述变更为:

$11.7/1 Nested classes [class.access.nest]

A nested class is a member and as such has the same access rights as any other member. The members of an enclosing class have no special access to members of a nested class; the usual access rules (Clause 11) shall be obeyed.

Example:

class E {
    int x;
    class B { };
    class I {
        B b;                  // OK: E::I can access E::B
        int y;
        void f(E* p, int i) {
            p->x = i;         // OK: E::I can access E::x
        }  
    }; 
    int g(I* p) {
        return p->y;          // error: I::y is private
    }  
};

从上面的描述和示例代码对比中我们可以明显看出,在旧标准中嵌套类和“被嵌套类”没有什么特殊的关系,就像两个普通类一样;但是在新标准中嵌套类已经完全视为“被嵌套类”的成员,那么自然也获得了“被嵌套类”成员应该有的访问控制权限。这也就意味着“被嵌套类”的普通成员拥有的访问“被友元类”私有成员变量的权限,嵌套类也能够获得,那么 Leveldb 在新版本的编译器下能够编译通过也不足为奇了。

不过 gcc3.4 的编译错误问题还不能单单归究于标准的变化。因为 gcc3.4 已经能够支持嵌套类访问“被嵌套类”的私有成员(因为在很早以前这就被确认为一个缺陷),只是不能够支持友元关系到嵌套类的传递。友元关系的传递可能是在 4.1 或者 4.2 版本中实现的,应该属于上述标准变化的衍生特性。

通过科目三路考

前天晚上通过科目三路考,我为期半年的驾校学习总算结束了。

想起来这半年也是折腾不断。我是 2 月底参加水木团购版的东方时尚驾校报名团购,3 月中旬考科目一。之后因为懒得约模拟机,光模拟机 6 个小时花了三周时间才上完。后来从淘宝上买了个约车软件,专门用来约周末的散段,效率还挺高的,也就三周就把散段上完了。后来科目二考试也算挺顺利。只是5月该考科目三的时候忽然忙起来了,工作上各种事情,再加上天气热,就把科目三考试拖了下来,一直拖到 9 月。

工作以后考驾照呢,我觉得主要还是得看执行力。执行力高的,两个月以内就能搞定,像我这样执行力差的,光拖就能拖四个月。我自己的一个特性是有压力的时候不希望分心。前段时间在做一个比较大的升级,对我个人的设计能力、架构把握和编码能力都是一个挑战。再加上合作半年的一个同事离职,新加入的同事也不太熟悉,我只好一个人吭哧吭哧把剩下的事情做完。

不过这个项目让我很有成就感!重构了一个 2 万多行的模块,精简到不到 1 万行;重整了 4 万多行代码,架构上进行了很大的调整,脉络上清晰很多。另外也加了些新功能。但是由于我负责的系统涉及产品线太多,平时工作中新需求、沟通、线上问题处理等日常事务占到一大半,很多时候只能加班写代码。这样大的工作量让人有些筋疲力尽。现在项目完成了,我也想休息休息调整一下,就把后续的编码工作让给同事去做,我下面这个月主要任务就是跟进测试和校园招聘的工作了。

算了,又跑题了,最后做个广告吧:如果 2012 应届生同学对百度提供的工作机会感兴趣的话,欢迎邮件简历给我!优秀的朋友我可以帮助内部推荐,相信你知道在哪里能找到我的邮箱地址 :)

epoll 事件之 EPOLLRDHUP

在对系统问题进行排查时,我发现了一个奇怪的现象:明明是对方断开请求,系统却报告一个查询失败的错误,但从用户角度来看请求的结果正常返回,没有任何问题。

对这个现象深入分析后发现,这是一个基于 epoll 的连接池实现上的问题,或者说是特性 :)

首先解释一下导致这个现象的原因。

在使用 epoll 时,对端正常断开连接(调用 close()),在服务器端会触发一个 epoll 事件。在低于 2.6.17 版本的内核中,这个 epoll 事件一般是 EPOLLIN,即 0x1,代表连接可读。

连接池检测到某个连接发生 EPOLLIN 事件且没有错误后,会认为有请求到来,将连接交给上层进行处理。这样一来,上层尝试在对端已经 close() 的连接上读取请求,只能读到 EOF,会认为发生异常,报告一个错误。

因此在使用 2.6.17 之前版本内核的系统中,我们无法依赖封装 epoll 的底层连接库来实现对对端关闭连接事件的检测,只能通过上层读取数据时进行区分处理。

不过,2.6.17 版本内核中增加了 EPOLLRDHUP 事件,代表对端断开连接,关于添加这个事件的理由可以参见 “[Patch][RFC] epoll and half closed TCP connections”。

在使用 2.6.17 之后版本内核的服务器系统中,对端连接断开触发的 epoll 事件会包含 EPOLLIN | EPOLLRDHUP,即 0x2001。有了这个事件,对端断开连接的异常就可以在底层进行处理了,不用再移交到上层。

重现这个现象的方法很简单,首先 telnet 到 server,然后什么都不做直接退出,查看在不同系统中触发的事件码。

注意,在使用 2.6.17 之前版本内核的系统中,sys/epoll.h 的 EPOLL_EVENTS 枚举类型中是没有 EPOLLRDHUP 事件的,所以带 EPOLLRDHUP 的程序无法编译通过。

僵尸对象或 RAII

我最近在想这个问题,到底要不要在程序中使用异常?

以前写的 C 代码比较多,即使写 C++,基本上也是把它当成 C with object 来用。对异常的了解偏少,使用更是极少。最近评审别人代码的时候遇到一个问题:如果构造函数中 new 失败了,会发生什么事情?

工程的代码一般提倡哪里出错在哪里处理,不能恢复的要返回错误码给调用者。在一般情况下,使用 new(std::no_throw) 保证 new 不抛出异常(否则结果是灾难性的),并且检查分配是否成功是可以实现这一点的。

遗憾的是构造函数没有返回值,我们不能返回构造失败。那么只有用迂回的办法,为类定义一个成员变量 bool inited。初始化为 false,只有在构造的工作都完成之后,才将它置为 true。如果一个对象的 inited 成员为 false,就意味着它构造过程中出了问题,不能被使用。这就是一个僵尸对象,“活死人”。

看,我们成功地规避了使用异常。但是慢着,不是只有 bad_alloc 这一个异常啊!还有 bad_cast、runtime_error、logic_error,还有:

$ grep class /usr/include/c++/4.5/stdexcept 
// Standard exception classes  -*- C++ -*-
// ISO C++ 19.1  Exception classes
   *  program runs (e.g., violations of class invariants).
   *  @brief One of two subclasses of exception.
  class logic_error : public exception 
  class domain_error : public logic_error 
  class invalid_argument : public logic_error 
  class length_error : public logic_error 
  class out_of_range : public logic_error 
   *  @brief One of two subclasses of exception.
  class runtime_error : public exception 
  class range_error : public runtime_error 
  class overflow_error : public runtime_error 
  class underflow_error : public runtime_error 

天那,我未曾注意过标准库有那么多异常!那么如果在使用标准库时,不小心触发了什么异常,OMG!

这样看来,使用异常是很有必要的。但是,麻烦的问题又来了,一旦使用异常,函数的退出过程就变了。使用错误码有一个好处,就是你可以在函数返回前擦干净自己的屁股;但是使用异常呢?你既要保证对象能够自己擦屁股(RAII),还要保证函数能自己擦屁股(在正确的位置使用异常处理),这样才能在 stack unwinding 时不会导致内存泄露。哦,auto_ptr 可以帮上一些忙,但如果是分配的资源是数组呢?

还有一个麻烦是,你要遵从约定——特别是对于一个程序库作者来说。如果约定出错时抛出异常,那么可以抛;如果约定出错时返回错误码,或者这个库可能被 C 调用,那么抛出异常就可能是灾难。

现在看来,如果想实现更健壮的 C++ 程序,那么异常处理是不可或缺的。但在使用异常处理之前,必须得了解在哪里、怎样抛出和捕获异常,如果是团队合作,可能还需要有简单的操作指导手册,否则使用不当或者过量的异常也可能带来麻烦。

我还在路上!

编程杂感 20110313

唉,最近表达的欲望很小,这篇日志也仅仅是凑数而已。前一段时间周旋在几个项目之间,忙的没什么时间思考问题或者写字。从上周开始,退出了一个跨部门合作的项目,专心于自己的事情。其中的原因有很多,不好说也不可说。

这半年来,对我所负责的系统,我致力于的是消灭各种 bug,提高稳定性,添加新功能以及为系统的未来发展做一个较为长远的规划。这半年里的程序升级要么从非常微小之处着手,要么是新的模块,擅长的是以最小的代价修复一个具体问题,但不曾仔细思考过如何对一个模块的设计缺陷进行逐步的全面的修正。这种修正类似于重构,但不是推倒重来,而是慢慢地逐步演进。我现在感觉到,在这方面的能力或经验,我还是缺乏的。

这样看来,被分配去做一个已有系统的维护和升级也不是坏事。最近可自己支配的时间多了些,我就沉浸在代码的阅读里。常常在思考的问题是如果让我来,这部分代码的结构该如何设计,才能够提供最大的灵活性,例如易复用、易扩展。好的代码能让你赞叹,差的代码也让你嗟叹,对已有代码的反思和修正,未始不能让自己得到成长。在这个方向上,我和我们的项目都还有很多功课需要做。

在 shell 脚本里打日志

今天小弟在重构代码中的一个脚本模块,其中涉及到日志功能。上午花了点儿时间想出了个在 shell 打日志的技巧,觉得值得写一下。

希望要实现的效果是:实现一个 write_log 命令,给一条出错消息作为输入,write_log 记录日志时自动加上 时间戳、脚本文件名和行号。形如:

2010-12-17 19:13:44 [work.sh:24] FATAL: mkdir -p /x.

时间戳、脚本文件名都好获得,但是行号就没那么容易实现了。shell 中的 $LINENO 变量只能展开成当前行的行号,如果把 write_log 实现成函数的话,势必在函数中无法使用 $LINENO。

开始我想了好大一会儿,觉得 eval 能干这个事情。但是如果用 eval 的话,还不如直接把 $LINENO 传给 write_log 函数呢,与我的初衷不是太相符。我拉来同事讨论了一把,也没解决问题。正当我准备放弃了,计划每次传 $LINENO 参数时,忽然想起来,怎么把 alias 给忘了呢?

于是,write_log 的实现就是这个样子了:

function _write_log()
{
  if [ $# -eq 2 ]; then
    if [ -z $LOGFILE ]; then
      echo "$(date "+%Y-%m-%d %H:%M:%S") [$0:$1] $2"
    else
      echo "$(date "+%Y-%m-%d %H:%M:%S") [$0:$1] $2" >> $LOGFILE
    fi
  elif [ $# -eq 1 ]; then
    if [ -z $LOGFILE ]; then
      echo "$(date "+%Y-%m-%d %H:%M:%S") [$0] $1"
    else
      echo "$(date "+%Y-%m-%d %H:%M:%S") [$0] $1" >> $LOGFILE
    fi
  else
    return 1
  fi
}
alias write_log='_write_log $LINENO' # 这里必须使用单引号

存在的问题是:上面这段代码在 bash 里是不工作的,但是用 sh 可以——即使 sh 也是链接到 bash 的。问题出在 alias 上,可以把问题简化成这样,有一个脚本 a.sh:

$ cat a.sh
alias lss='ls -l'
lss /tmp

这个脚本用 /bin/sh 执行是这样的:

$ sh a.sh 
total 8
drwx------ 2 gdm gdm 4096 2010-12-17 19:34 orbit-gdm
drwx------ 2 gdm gdm 4096 2010-12-17 11:04 pulse-PKdhtXMmr18n

用 /bin/bash 执行是这样的:

$ bash a.sh 
a.sh: line 2: lss: command not found

把 bash 随便 link 成一个叫 sh 的链接文件,再执行是类似这样的:

$ ln -s /bin/bash ~/sh
$ ~/sh a.sh 
total 8
drwx------ 2 gdm gdm 4096 2010-12-17 19:34 orbit-gdm
drwx------ 2 gdm gdm 4096 2010-12-17 11:04 pulse-PKdhtXMmr18n

这个问题肯定是有原因的,我不愿意去翻 bash 源代码,也不知道哪里去找答案,所以我放弃了,直接在文件头加上

#!/bin/sh

如果哪位兄台知道这种“奇怪”现象的原因所在,请不吝赐教 :)

使用 Sikuli 实现同时登录两个 Dropbox 帐户

来自 MIT 的用图片编程的 Sikuli 语言最近着实火了一把,看着对岸的程序员 Vgod 开发出如此酷的软件着实令人羡慕。但除了 Demo 之外,能不能拿 Sikuli 来 engineer a better life 呢?显然是可以的,就如 Vgod 这篇文章所说,Sikuli 有无穷的潜力,那我们就来玩儿一把,展示一下 Sikuli 的一个现实应用。

1. Dropbox

Dropbox 是一个在线文件存储系统,可以用来存储和在不同电脑间共享文件,但是一个 Dropbox 用户只有 2G 的存储空间,当我们文件多的时候,就受到限制了。而一般情况下 Dropbox 只能运行一个例程,使用多个用户貌似不可行。但是到底可能吗?

当然可能,只是我们需要多个 Windows 帐户。也就是说,每个 Windows 帐户可以运行一个 Dropbox,如果你系统里有多个帐户,就可以运行多个 Dropbox。注意,受到安全策略的限制,这些帐户必须设置密码。比如我们新建一个"dropbox"帐户,密码也是"dropbox"。

2. 笨的方法

一般情况下使用其它帐户运行程序的方式为:在程序或者快捷方式上点右键,选择“运行方式”,然后选择“下列用户”,输入你期望的用户和密码(dropbox:dropbox)来执行该程序。

3. 聪明的方法

但是这样做太麻烦了,我们可以用批处理脚本做这件事情:

start D:\Program\Dropbox\Dropbox.exe
runas /user:dropbox D:\Program\Dropbox\Dropbox.exe

但这样还要手工输入密码,有很多种方法可以避免手工输入 runas 密码,但很遗憾它们大多在 Windows XP Home Edition 上不可用。

用 Home Edition 的同志还是得交互式的输入密码。能不能不手工输呢?可以,比如 expect 就是专门处理交互的语言。不过,学起来太麻烦了吧,要不来看看 Sikuli 怎么做?

4. 使用 Sikuli

下面这个图就是完成启动两个 Dropbox 的 Sikuli 程序:

使用 Sikuli 同时启动两个 Dropbox

首先switchApp("cmd")启动 Windows 的命令行,然后wait等待那个提示符出现,然后 type() 键入一行 runas 命令,wait 等待提示输入密码,type 输入密码 dropbox 加回车 \n,bingo,出来一个 dropbox 了,最后再 type 一行启动非 runas 的 dropbox,又出来一个 dropbox。

上述程序运行结果如下图所示:

两个 Dropbox 在运行

好玩吧!Sikuli 程序就是那么简单,我从下载 Sikuli 到完成这个程序大约花了四十分钟的时间,这可比去学 expect 快多了。这下 expact 之类的交互语言在简单的场景下可以无视了。

你可以将 Sikuli 程序导出成一个 .skl 文件,据说可以双击运行,不过我尝试未成功,这是一个遗憾,希望后续版本可以解决这个问题。

5. 注册 Dropbox

您如果对 Dropbox 感兴趣的话,可以点击下面我的两个邀请链接注册,这样咱们的空间都可以增加 250M。本人将非常感谢您的支持。(如果您打算再注册一个的话,最好不要用自己的邀请链接,因为同一台电脑上激活的用户不会奖励空间。)

https://www.dropbox.com/referrals/NTE2NjMyMTU5

Math in CS:置换的轮换分解

随便一本《近世代数》或者《抽象代数》书上在讲到置换群的时候,应该都会讲到这样一个定理:
任何一个置换都可以表示为不相交轮换的乘积,若不计因子的顺序,其分解式是唯一的。

一、简单解释

没有数学背景的人,这句话很难读懂,下面我们来看一个简单的例子。假设我们有这样一个置换 P:

1, 2, 3, 4, 5
2, 5, 4, 3, 1

那么这个置换是什么样的轮换的乘积呢?我们先从 1 出发,1 被换到 2,2 被换到 5,5 又被换到 1,这就是一个轮换;然后再从 3 出发,3 被换到 4,4 又被换到 3,这又是一个轮换。也就是说 P 是两个不相交轮换 (1, 2, 5) 和 (3,4) 的乘积。

二、一个应用:全排列判断问题

下面我们来看这个定理有什么作用,考虑下面这道题目[1][2]:

给一个 n 长的数组,判断它是否为一个 1, 2, ..., n 的全排列,要求在线性时间,常数空间内实现。

我们可以容易看到,每个全排列都可以视为 1, 2, ..., n 上的一个置换。问题就转化为检测该数组是不是一个 1, 2, ..., n 的置换。由本文开头提到的定理可知,我们只需要检查该置换是不是由不相交的轮换构成的即可。

还是上面那个例子,怎么检查

1, 2, 3, 4, 5
2, 5, 4, 3, 1

是不是一个置换呢?首先从 1 开始,a[1]=2,那么再检查 a[a[1]]=a[2]=5,然后再检查a[a[a[1]]]=a[5]=1,这样就发现了一个轮换 (1, 2, 5)。然后接下来检测第二个,第三个轮换...

如何保证检查的高效以及所有轮换都不相交呢?我们每次检查完一个数,就将它置负,这样遇到负值,循环就终止了。如果终止前检查的那个数与起始的数相同,那么我们就发现了一个轮换,否则它就不是一个轮换,说明 P 不是一个置换。由于检查过的轮换中的数字都被置为负值,所以第二个轮换肯定不会与第一个轮换相交。如果到最后所有的数都被置为负值,且循环正常终止,那么说明它们都在不相交的轮换里,那么 P 就是一个置换。

如果想要查找过程不影响最终数组的值,到最后把所有置负的元素都重新置正即可。

代码实现如下[2]:

/* We use a n+1 elements array a[n+1] for convenience. a[0] is used to store
 * the return value, thus is not part of the permutation.  */
int test_perm(int *a, int n)
{
  int i, j;
  if (a == NULL)  return 0;     /* Test input */
  a[0] = 1;
  for (i = 1; i <= n; ++i)      /* Test input */
    if (a[i] < 1 || a[i] > n) { /* Is a[i] in the range 1~n? */
      a[0] = 0;
      return a[0];
    }

  for (i = 1; i <= n; ++i)
    if (a[i] > 0) {
      j = i;
      while (a[j] > 0) {        /* Follow the cycle */
        a[j] = -a[j];
        j = -a[j];
      }
      if (j != i)  a[0] = 0;    /* Test the cycle */
    }

  for (i = 1; i <= n; ++i)
    a[i] = a[i] > 0 ? a[i] : -a[i];

  return a[0];
}

三、另一个应用:100 囚徒碰运气问题

那么这个定理还有其它的用处没有呢?考虑下面这道题目[3][4]:

100 个囚犯,每人有一个从 1 到 100 的不重复不遗漏的号码,国王把这些号码收集起来,打乱放进 100 个箱子里,每个箱子里有且仅有一个号码。囚犯们一个一个地来到 100 个箱子面前,每人可以打开至多 50 个箱子来寻找自己的号码,可以一个一个打开(即可以根据之前箱子里看到的号码来决定后面要打开的箱子)。如果有一个囚犯没有找到自己的号码,那么这 100 个人一起被处死;只有当所有的囚犯都找到了自己的号码,他们才会被国王全部释放。

囚犯们可以在没开箱子前商量对策,但是一但打开了箱子,他就不能告诉别人箱子和号码的对应关系。问他们应该用什么样的策略以保证最大的存活概率?

显然,每个人随机选 50 个箱子打开,100 个人的存活概率会是 1/2 的 100 次方,即1/1267650600228229401496703205376,可以小到忽略不计。但是事实上有一种极简单的办法,其存活概率高达 30% 。至于有没有更好的办法?我不知道。

存活率达 30% 的策略就是:

囚犯打开自己号码对应的箱子,就按照箱子中的号码打开另一个箱子,一直到找到自己号码或者选50 次为止,这样就能保证整体有 30% 的存活概率。

这个策略背后的数学原理是什么呢?其实国王所作的事情,就是一个 1 到 100 元素集合的置换,囚犯所做的事情,就是顺着自己号码所在的轮换找自己号码。那么什么时候所有人都不用死呢?就是这个置换中所有的轮换长度都不大于 50,因为每个囚犯号码的轮换都不大于 50,那么他总能在 50 次以内找到自己的号码。

怎么计算这个概率 P 呢?{这个置换中所有的轮换长度都不大于 50 的概率},就是 1 - {存在轮换长度大于 50 的概率},进而 1 - {存在轮换长度为 51, 52, ..., 100 的概率},由此,我们可以得到下面的等式:

P=1-\frac{1}{100!}\sum_{k=51}^{100}\binom{100}{k}(k-1)!(100-k)!=1-\sum_{k=51}^{100}%20\frac{1}{k}=1-(H_{100}-H_{50})

其中,Hn 代表调和数(Harmonic Number)。虽然调和数没有精确的公式,但是我们知道调和数和自然对数有着密切的联系[5],那么我们就可以用自然对数来近似:

P\approx1-(ln(100)-ln(50))=1-ln(2)\approx0.30685281944005469059[6]

因此,我们可以得到,使用这种策略 100个囚犯的存活概率 P 约为 30%。

[1] http://yueweitang.org/bbs/topic/22
[2] http://fayaa.com/tiku/view/84/
[3] http://tydsh.spaces.live.com/Blog/cns!435F1A315756AD5D!833.entry
[4] http://fayaa.com/tiku/view/141/
[5] http://en.wikipedia.org/wiki/Harmonic_number#Calculation
[6] 求和得到的更精确的结果是:0.31182782068980479698,Bash 代码:

STR="1-("
for i in `seq 51 99`; do
  STR+="1/$i+"
done
STR+="1/100)"
echo $STR | bc -l

Cygwin GCC qsort 函数错误(续)

上一篇文章中提到我在为 qsort 写 compare 函数时犯了一个愚蠢的错误:我脑袋陷入了一个错误的逻辑,以为 compare 函数嘛,就是要 compare 一下,那么我用 '>' 或者 '< ' 这种比较算符就可以满足要求(潜意识里认为 > 会返回 1 或者 -1,显然是错的,上篇文章的评论者 Stephen 开始也犯了同样的直觉错误,不过他马上就醒悟过来了)。我当时脑袋里也犹豫了一下要不要处理相等的情况,后来想快排算法中没有判断相等的情况,那么我没必要加上等号。

这个错误直接导致了快排算法失效。

但是为什么在 Linux 下的 gcc 可以输出正确的排序结果呢?我想了很久,最终还是把 glibc 的代码看了一下,才发现,原来当数组规模比较小时时(数组大小小于物理内存的四分之一),glibc 的 qsort 其实不使用 quick sort(_quicksort),而是使用 merge sort(msort_with_tmp)。而且在 msort_with_tmp 中,对 compare 的处理是比较其返回值是否 <=0,这样排序的结果就是正确的了。[1]

事实上最简单的快排算法是只使用 '<' 号或者 '<='的,比如 Wikipedia 上给出的快排算法,那么我们的 compare 只返回 -1 和 0 行吗?这取决于实现,比如对快排算法的优化中有一个就是对数组中有大量相等元素情况下的优化,其中一种实现 Three-way partition, 就需要使用到三种情况:大于、小于或等于。原始的快排 partition 是将数组按照与 pivot 的比较分为两段,Three-way partition 则是将数组分为三段,中间增加一段与 pivot 值相等的子数组。C 玩具代码的实现如下:

void qsort_3way(int a[], int lo, int hi)
{
  if (hi <= lo) return;
  int lt = lo, gt = hi, i = lt;
  int v = a[lo], t;
  while (i <= gt) {
    if (a[i] < v) {
      t = a[i]; a[i] = a[lt]; a[lt] = t;
      ++i; ++lt;
    } else if (a[i] > v) {
      t = a[i]; a[i] = a[gt]; a[gt] = t;
      --gt;  
    } else i++;
  }
  qsort_3way(a, lo, lt - 1);
  qsort_3way(a, gt + 1, hi);
}

但是 '<' 和 '>' 真的都需要吗?理论上来讲,'>' 是不需要的,我们显然可以将 a[i] > v 改成 v < a[i]。这也是 C++ 里面做的,C++ 中的 sort 函数只需要类重载 '< ' 运算符。但是 C 中并没有这种约定,我们不能预设 qsort 如何拿 compare() 的返回值与 0 比较。因此让 compare() 按照 C 的约定,返回大于、小于和等于 0 的三种情况是绝对正确的而且必要的。

我了解了正确的结果怎么得来的,但是我仍然不知道错误的结果是怎么得来的。看起来 Cygwin 使用的 libc 中没有采取类似 Linux 下 gcc 的策略(比如无法取到物理内存大小?)。quick sort 算法有很多优化的技巧和实现:有的使用 '< ' 符号比较,有的在分支数组足够小时采用插入排序,有的同时使用 '<', '> 两个符号,有的随机取 pivot,有的取三点中值作为 pivot。[2] 没有看到代码和调试,很难判断 Cygwin 的 libc 使用了什么算法(当然,尝试分析不同的输入输出是可以得到规律的,比密码分析还是要简单一些)。

[1] glibc/stdlib/msort.c.
[2] Jon Bentley and M. Douglas McIlroy, "Engineering a sort function", Software - Practice and Experience, Vol. 23 (11), 1249-1265, 1993.

Cygwin GCC qsort 函数错误

我平时在 Windows 下写代码时,经常使用 Cygwin 的 gcc。但是今天我居然发现 Cygwin 下 gcc 的 qsort 函数是错误的!这种基本的函数出错,太让人惊讶了。为了验证是不是代码有错,我使用 tcc 和 Linux 下的 gcc 都编译了同样一段程序,它们两个都输出了期望的结果,只有 Cygwin 的 gcc 是错的。下面是示例代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int compare(const void *p, const void *q)
{
  return *(const char *)p > *(const char *)q;
}

int main()
{
  char a[] = "1312515";
  printf("%sn", a);
  qsort(a, strlen(a), sizeof(char), compare);
  printf("%sn", a);
  return 0;
}

按说它应该输出:

1312515
1112355

但是我用 Cygwin gcc 编译后,它居然运行出这样的结果:

1312515
2111355

太诡异了。我尝试调试它,结果 gdb 无法步入 qsort 代码中。谁能告诉我是为什么?

附 Cygwin gcc 信息:

$ gcc -v
Using built-in specs.
Target: i686-pc-cygwin
Configured with: /gnu/gcc/package/gcc4-4.3.2-2/src/gcc-4.3.2/configure --srcdir=/gnu/gcc/package/gcc4-4.3.2-2/src/gcc-4.3.2 --prefix=/usr --exec-prefix=/usr --bindir=/usr/bin --sbindir=/usr/sbin --libexecdir=/usr/sbin --datadir=/usr/share --localstatedir=/var --sysconfdir=/etc --infodir=/usr/share/info --mandir=/usr/share/man --datadir=/usr/share --infodir=/usr/share/info --mandir=/usr/share/man -v --with-gmp=/usr --with-mpfr=/usr --enable-bootstrap --enable-version-specific-runtime-libs --with-slibdir=/usr/bin --libexecdir=/usr/lib --enable-static --enable-shared --enable-shared-libgcc --enable-__cxa_atexit --with-gnu-ld --with-gnu-as --with-dwarf2 --disable-sjlj-exceptions --enable-languages=ada,c,c++,fortran,java,objc,obj-c++ --disable-symvers --enable-libjava --program-suffix=-4 --enable-libgomp --enable-libssp --enable-libada --enable-threads=posix AS=/opt/gcc-tools/bin/as.exe AS_FOR_TARGET=/opt/gcc-tools/bin/as.exe LD=/opt/gcc-tools/bin/ld.exe LD_FOR_TARGET=/opt/gcc-tools/bin/ld.exe
Thread model: posix
gcc version 4.3.2 20080827 (beta) 2 (GCC)

我犯了一个愚蠢的错误,感谢来自 Stephen 的评论

你的compare函数有问题,你的compare函数不会返回负数。修改compare为:
int compare(const void *p, const void *q)
{
return *(const char *)p - *(const char *)q;
}
再编译运行就正确了。

字符串参数的模板函数推导问题(续)

前面一篇文章我们讨论了字符串作为参数的模板函数推导问题,下面我们看一下使用不同字符串参数类型对模板函数实例化的影响。代码如下,在语句后面的注释为该句的输出。该输出是 g++ 编译后产生的输出,主要是因为输出简洁,而且我们这里只关心模板函数的不同实例,并不关心 const 类型。

#include <iostream>
#include <typeinfo>
#include <vector>
#include <string>
using namespace std;

template<typename T>
void foo(const T& t)
{
  cout << "foo: generic(" << t << ") " << typeid(t).name() << endl;
}

template<typename T>
void bar(const T t)
{
  cout << "bar: generic(" << t << ") " << typeid(t).name() << endl;
}

/*
$ c++filt [-t] A1_c A2_c A3_c Ss PKc
char [1]
char [2]
char [3]
std::basic_string<char, std::char_traits<char>, std::allocator<char> >
char const*
*/
int main()
{
  foo("");                              // foo: generic() A1_c
  foo("0");                             // foo: generic(0) A2_c
  foo("01");                            // foo: generic(01) A3_c
  foo(static_cast<string>(""));         // foo: generic() Ss
  foo(static_cast<string>("0"));        // foo: generic(0) Ss
  foo(static_cast<string>("01"));       // foo: generic(01) Ss
  foo(static_cast<const char *>(""));   // foo: generic() PKc
  foo(static_cast<const char *>("0"));  // foo: generic(0) PKc
  foo(static_cast<const char *>("01")); // foo: generic(01) PKc
  foo(*(new string("")));               // foo: generic() Ss
  foo(*(new string("0")));              // foo: generic(0) Ss
  foo(*(new string("01")));             // foo: generic(01) Ss
  bar("");                              // foo: generic() PKc
  bar("0");                             // foo: generic(0) PKc
  bar("01");                            // foo: generic(01) PKc
  bar(static_cast<string>(""));         // foo: generic() Ss
  bar(static_cast<string>("0"));        // foo: generic(0) Ss
  bar(static_cast<string>("01"));       // foo: generic(01) Ss
  bar(static_cast<const char *>(""));   // foo: generic() PKc
  bar(static_cast<const char *>("0"));  // foo: generic(0) PKc
  bar(static_cast<const char *>("01")); // foo: generic(01) PKc
  bar(*(new string("")));               // foo: generic() Ss
  bar(*(new string("0")));              // foo: generic(0) Ss
  bar(*(new string("01")));             // foo: generic(01) Ss
  return 0;
}

基于前一篇博客的分析,我们知道形如 "hello" 的常量字符串在编译时的类型是 char 数组。不同长度的 char 数组,其类型是不一样的,我们可以使用下面语句:

cout << (typeid(char [1]) == typeid(char [2])) << endl;

来验证这一想法。因此,如果我们使用不同长度的字符串作为参数调用 foo,编译器就会为模板函数 foo 实例化不同的实例函数,这一点已经由 foo 的前三个输出验证。我们还可以通过 readelf 来读取目标文件符号表,或者 objdump 查看目标文件反汇编代码中 foo 的实例函数的数量。

$ readelf -s test.o | c++filt -t | less
$ objdump -S test.o | c++filt -t | less

这也就是说,我们使用原始字符串调用了三次 foo,其实是三个不同的实例函数,这样显然会导致目标代码臃肿。那么怎么避免这种情况出现呢?下面我们使用了三种不同的方法,将字符串 static_cast 成 string 或者 const char * 类型,或者使用字符串构造一个 string 对象作为参数,这三种情况都能保证不同(内容)字符串参数的调用使用的是同一个实例化的模板函数。

有没有方法避免类型转换呢?我们可以使用非引用参数类型作为模板函数的模板参数,如 bar 模板函数所示。如前一篇中的分析,此时 char 数组类型会被隐式转换成 char 指针类型,然后进行模板函数推导。所以我们看到即使传的是原始字符串参数,其调用的实例化函数仍然是 char const * 类型的。由于这里类型 T 被推导为 char const * 类型,所以传递的仍然是指针。

但是下面的 string 类型的实例化模板函数实现的就是值传递了,这在函数运行效率上可能会有一些影响。不过现代的函数库对 string 都实现为 copy-on-write(例如 MFC 的 CString 和 Qt 的 QString),我想 STL 的 string 应该也不例外,而 const T 参数并不允许对参数修改,所以效率上的影响应该还是比较小的。只是在语义上与传一个指针就有不同了,假如不限定 T 是 const,那么值传递 string 时,对 string 的修改就无法反映到原来 string 上了。

最后,到底哪个方法好呢?我不知道,我没有足够的实践经验来评论哪种方法更好。我这两篇文章的目的仅仅是探讨一下使用不同形式字符串作为模板函数参数时可能发生的奇怪现象,以及要注意的方面,至于哪种方法更好,可能要留待实际需求来决定。

附:第一段代码的 VS 2008 编译器编译结果执行的输出:

foo: generic() char const [1]
foo: generic(0) char const [2]
foo: generic(01) char const [3]
foo: generic() class std::basic_string,class std::allocator >
foo: generic(0) class std::basic_string
,class std::allocator >
foo: generic(01) class std::basic_string
,class std::allocator >
foo: generic() char const *
foo: generic(0) char const *
foo: generic(01) char const *
foo: generic() class std::basic_string
,class std::allocator >
foo: generic(0) class std::basic_string
,class std::allocator >
foo: generic(01) class std::basic_string
,class std::allocator >
bar: generic () char const *
bar: generic (0) char const *
bar: generic (01) char const *
bar: generic () class std::basic_string
,class std::allocator >
bar: generic (0) class std::basic_string
,class std::allocator >
bar: generic (01) class std::basic_string
,class std::allocator >
bar: generic () char const *
bar: generic (0) char const *
bar: generic (01) char const *
bar: generic () class std::basic_string
,class std::allocator >
bar: generic (0) class std::basic_string
,class std::allocator >
bar: generic (01) class std::basic_string
,class std::allocator >