C++ 多线程调用 Lua 的正确姿势

上一篇文章《轻量级的 C++ Lua 传参方法 - Protobuf 反射》提到我正在 C++ 项目中使用简单的 Lua 脚本做一些灵活的程序逻辑,这样可以把多个 Lua 脚本放在数据库或者缓存里,根据不同的条件选择执行不同的脚本,而且可以在线更新这些脚本。

但在 C++ 项目中集成 Lua,遇到的第一个问题就是多线程问题。Lua 本身是不知晓宿主程序线程环境的,所以 lua_State 的多线程访问是不安全的。而多线程又是服务端程序的天然特性,我暂时还不想把所有逻辑都托管给 Lua,用它自己的多线程机制。所以看起来有下面几个选择:

一是把 Lua 脚本也看成是一个独立服务,通过 RPC 或者消息队列的方式去调用它,在 Lua 脚本内部处理性能问题。这样做相当于用 Lua 写了个服务,脚本复杂度比较高,偏离了我的本意。

二是在每个 C++ 线程里,都创建一个独立的 Lua VM。每个线程有自己的上下文,也自然就互不干扰了。这样做会浪费一些内存,但考虑到 Lua VM 几十 K 级别的大小,对服务端来说根本不算什么开销。但创建线程级的 Lua VM,内存管理上又有不同的方法。

__thread 修饰符只能约束 POD 变量,的确可以存 lua_State 指针,可是它不支持指针的销毁,必须自己额外管理 lua_State 对象,然后再把指针传给线程变量。thread_local 的确能支持复杂类型,可以在析构里销毁 lua_State,可又要求 C++11。思来想去,用pthread_key_create/delete + pthread_get/setspecific 是一个相对稳妥又较为简单的方法,即不用额外自动管理内存,又能实现在线程结束后自动析构线程自己的 Lua VM。

三是将 C++ 的线程与 Lua 的线程对应起来,使用同一个 Lua VM,但在每个 C++ 线程中都用lua_newthread 创建一个 Lua 线程 State 指针。不过在创建线程那一刻,仍然需要对主 lua_State 加锁。这其实相当于给每个 C++ 线程都创建了一个独立的 Lua 堆栈,这样在传参和执行脚本的时候就不担心有数据冲突。理论上来讲效果应该与二是类似的。

四是使用一个线程安全的对象池。将 lua_State 指针放到对象池里,需要的时候拿出来,用完再放回去,由对象池来管理创建和销毁。这就需要一个额外的内存管理容器,代码量大一些,只是对很多成熟的产品来说,可能本身就有这样的轮子。

考虑到代码量,复杂度等问题,我实际在项目中采取了二方案。不过我对 Lua 的了解还不深入,不知道是否还有更好的办法?

一种轻量级 C++ Lua 传参方法 - Protobuf 反射

虽然很多动态语言(例如 PHP)的性能在近些年有了大幅度的提升,也得到了更广泛的应用,但是在一些对性能要求比较严苛的场合,C/C++ 还是有着难以替代的优势。可 C/C++ 最大的缺点就是它的不够灵活,很小一点修改都必须得重新编译,部署,重启上线。为了增强 C/C++ 的灵活性,很多项目都选择嵌入 Lua 解析器来处理程序逻辑中的动态部分,我们也不例外。

目前我们对 Lua 的使用还是比较保守,主要是封装了一些基于特定条件的排序或者过滤规则。它的特点就是传入参数较多,但返回值特别少,基本上就是一个数字或者布尔值。最开始是使用的原始方法,手工去拼 Lua Table 作为传入参数,每加一个参数,就要手写几行添加元素的代码。最近我看到 brpc 里的 pb2json ,忽然想到完全可以用 Protobuf 的反射机制,自动拼 Lua Table。下面是基本类型的转换方法,当然,也可以用类似的方法对 Protobuf 的 map, message 等高级数据结构进行进一步封装。

void ProtoMessageToLuaTable(const google::protobuf::Message &message, lua_State *L) {
    lua_newtable(L);
    const Descriptor* descriptor = message.GetDescriptor();
    const Reflection* reflection = message.GetReflection();
    int field_count = descriptor->field_count();
    for (int i = 0; i < field_count; ++i) {
        const FieldDescriptor* field = descriptor->field(i);
        switch (field->type()) {
        case FieldDescriptor::TYPE_BOOL:
            lua_pushboolean(L, reflection->GetBool(message, field));
            break;
        case FieldDescriptor::TYPE_UINT32:
            lua_pushinteger(L, reflection->GetUInt32(message, field));
            break;
        case FieldDescriptor::TYPE_UINT64:
            lua_pushinteger(L, reflection->GetUInt64(message, field));
            break;
        case FieldDescriptor::TYPE_INT32:
        case FieldDescriptor::TYPE_SINT32:
            lua_pushinteger(L, reflection->GetInt32(message, field));
            break;
        case FieldDescriptor::TYPE_INT64:
        case FieldDescriptor::TYPE_SINT64:
            lua_pushinteger(L, reflection->GetInt64(message, field));
            break;
        case FieldDescriptor::TYPE_FLOAT:
            lua_pushnumber(L, static_cast<double>(reflection->GetFloat(message, field)));
            break;
        case FieldDescriptor::TYPE_DOUBLE:
            lua_pushnumber(L, reflection->GetDouble(message, field));
            break;
        case FieldDescriptor::TYPE_STRING:
            lua_pushstring(L, reflection->GetString(message, field).c_str());
            break;
        default:
            lua_pushnil(L);
            break;
        }
        lua_setfield(L, -2, field->name().c_str());
    }
}

其实调研了一下,发现还有一些其它的方法,比如 luabind, sol2一堆库。但这些工具更适合 C++ Lua 交互比较复杂的场合,而且也引入了额外的依赖和额外的要求(比如 C++11)。对于像我们这样的简单场景,在不引入更多依赖的情况下使用 Protobuf 反射机制,不失为一个好的选择。

寻找更快的平方根倒数算法

机器学习的模型相关计算中,有很多诡异的运算。单个运算的开销很不起眼,但如果这些运算的量足够大,也会对性能产生一定的影响。这里谈的就是一个简单的运算:

a = b / sqrt(c);

对于 C/C++ 语言的程序员来说,sqrt 已经是非常基础的库函数,它的底层实现也仅仅是简单的一句 FSQRT (双精度是 SQRTSD) 指令,看起来没有什么优化的余地。但事实上 intel 提供了一个更快的指令,那就是 SQRTSS,利用这条指令,平方根倒数的计算速度可以达到 sqrt 版本的两倍(实测,与[1]相同)。你可以这样使用它:

#include <xmmintrin.h>
...
__m128 in = _mm_load_ss(&c);
__m128 out = _mm_sqrt_ss(in);
_mm_store_ss(&c, out);
a = b/c;
...

但这就是优化的尽头了么?不,单就求平方根倒数来说,还有一个神奇的近似算法,叫做 Fast Inverse Square Root平方根倒数速算法)。一个神人在 Quake III Arena 游戏中使用了一个神奇的数字 0x5f3759df,创造了这个神奇的算法,这个算法可以将平方根倒数的计算速度提升到 sqrt 的 3 倍多(实测,效果比[1]差)。

float Q_rsqrt( float number )
{
        long i;
        float x2, y;
        const float threehalfs = 1.5F;
 
        x2 = number * 0.5F;
        y  = number;
        i  = * ( long * ) &y;                       // evil floating point bit level hacking
        i  = 0x5f3759df - ( i >> 1 );               // what the fuck?
        y  = * ( float * ) &i;
        y  = y * ( threehalfs - ( x2 * y * y ) );   // 1st iteration
//      y  = y * ( threehalfs - ( x2 * y * y ) );   // 2nd iteration, this can be removed
 
        return y;
}

但 3 倍就是优化的尽头了么?很不幸,邪恶的 Intel 提供了这样一条指令 RSQRTSS,从硬件上支持了这个近似算法。利用这条指令,平方根倒数的计算速度能够达到 sqrt 版本的 6 倍以上!!!

#include <xmmintrin.h>
...
    __m128 in = _mm_load_ss(&c);
    __m128 out = _mm_rsqrt_ss(in);
    _mm_store_ss(&c, out);
    a = b*c;
...

虽然平方根倒数速算法只是一种近似算法,并且只有单精度版本,但是对 RSQRTSS 指令的简单测试发现大部分情况下误差在万分之一以下,指令说明书中给出的误差是 ±1.5*2^-12[2],在非精确数值计算的工程系统中已经足够用了。

它带来的一个更有趣的后果是:如果使用 RSQRTSS 计算出来 c 的平方根倒数,然后再乘以 c,就得到了 c 的平方根近似值。用它可以反过来加速 sqrt 的运算![1]

注1:编译相关程序时,需要打开优化开关,以实现函数的 inline
注2:RSQRTSS 和 SQRTSS 均有一个向量版本,如 RSQRTPS,可以同时计算 4 个 float 的平方根倒数;

[1] Timing square root
[2] RSQRTSS

std::inner_product的简单性能测试

最近团队产品中用到了一些机器学习方面的算法,涉及到求向量内积,采取的是最朴素的实现方式(元素乘积循环相加)。有一天路上想到 STL 提供了一个模板函数 std::inner_product ,就好奇 libstdc++ 实现上是否对该算法做了什么优化呢?

于是做了个简单的实验:1000 维 double 类型向量乘积,用 std::inner_product 和朴素方法分别计算10000次,g++ -O2优化。第一轮使用原生 double 类型数组,第二轮使用 vector<double> 容器,分别在三个机器环境下进行了计算。

// Processors | physical = 2, cores = 32, virtual = 12, hyperthreading = no
//     Speeds | 12x2400.186
//     Models | 12xIntel(R) Xeon(R) CPU E5645 @ 2.40GHz
//     Caches | 12x256 KB
//        GCC | version 3.4.5 20051201 (Red Hat 3.4.5-2)
	   
a*b     : std::inner_product(27.934ms), for loop(40.061ms)
a_v*b_v : std::inner_product(27.878ms), for loop(40.04ms)

// Processors | physical = 2, cores = 12, virtual = 12, hyperthreading = no
//     Speeds | 12x2100.173
//     Models | 12xAMD Opteron(tm) Processor 4170 HE
//     Caches | 12x512 KB
//        GCC | version 3.4.5 20051201 (Red Hat 3.4.5-2)

a*b     : std::inner_product(31.242ms), for loop(47.853ms)
a_v*b_v : std::inner_product(31.301ms), for loop(47.815ms)

// Processors | physical = 1, cores = 0, virtual = 1, hyperthreading = no
//     Speeds | 1x2572.652
//     Models | 1xIntel(R) Core(TM) i5-3320M CPU @ 2.60GHz
//     Caches | 1x6144 KB
//        GCC | version 4.7.2 (Ubuntu/Linaro 4.7.2-2ubuntu1)

a*b     : std::inner_product(41.76ms), for loop(33.165ms)
a_v*b_v : std::inner_product(35.913ms), for loop(32.881ms)

可以看出不同环境下 std::inner_product 的表现不尽相同,与朴素的方式相比有优有劣。瞄了一眼 gcc 4.8 的 libstdc++ 的代码,没有注意到 std::inner_product 对基本类型做什么 SSE 指令的优化。不过倒是有个并行计算的版本,可能对超大的向量计算有帮助。

虽然从性能上没有看到明显的优势,但毕竟 std::inner_product 可以简化一个循环的编码,至少可以少测一个分支嘛。而且配合重载函数的后两个 functor 参数,可以做一些有趣的事情,比如算一组数的平方和,比较两个字符串相同字符的数量等。以后呢可以多尝试一下用标准库的算法而不是自己写循环。

Leveldb 编译错误背后的C++标准变化

在编译 Levedb 时,我遇到了这个错误:

g++ -c -I. -I./include -fno-builtin-memcmp -DLEVELDB_PLATFORM_POSIX -pthread -DOS_LINUX -O2 -DNDEBUG db/version_set.cc -o db/version_set.o
db/version_set.cc: In member function `void leveldb::VersionSet::Builder::Apply(leveldb::VersionEdit*)':
./db/version_edit.h:100: error: `std::vector, std::allocator > > leveldb::VersionEdit::compact_pointers_' is private
db/version_set.cc:461: error: within this context
...

在网上容易搜到解决方案,由于归根结底是访问控制问题,方法是把所有涉及到的的 private 变量或类型修改为 public。由于不是所有的编译器都会报错,我就很好奇产生这个错误的根本原因。

BTW: 一种不修改代码的 work around 方法是,在编译这个文件时加上 -fno-access-control 参数,这样 g++ 就不会进行访问控制检查,自然也就没问题了。这个参数同样可以用于对 private 成员函数进行单元测试。

简单地分析一下这个错误。发生错误的地方是在 VersionSet::Builder 这个类的成员函数中,而错误则是其成员函数无法访问 VersionEdit 和 Version 类的私有成员变量。VersionSet 是 VersionEdit 和 Version 类的友元类,Builder 是 VersionSet 的嵌套类。简化一下,代码如下所示:

class VersionSet;

class VersionEdit {
    friend class VersionSet;
    static int compact_pointers_;
};

class VersionSet {
    class Builder {
        int foo()
        {  
            return VersionEdit::compact_pointers_;
        }  
    }; 
};

把这段代码拿给编译器去编译,g++ 3.4.4/5 会报类似的 `int VersionEdit::compact_pointers_' is private 错误,但是 g++ 4.5.3 则能够编译通过。

由于 VersionSet 是 VersionEdit 的友元类,那么 VersionSet 是能够访问 VersionEdit 私有成员的,这样问题就集中在 Builder 是否能够获得与 VersionEdit 的友元关系。如果语法规定嵌套类 Builder 能够从 VersionSet “获得”友元关系,那么 Builder就能够访问 VersionEdit::compact_pointers_,反之就不能访问。

在 C++98 标准中,关于嵌套类的权限有如下描述:

$11.8/1 [class.access.nest],

The members of a nested class have no special access to members of an enclosing class, nor to classes or functions that have granted friendship to an enclosing class; the usual access rules (clause 11) shall be obeyed. The members of an enclosing class have no special access to members of a nested class; the usual access rules (clause 11) shall be obeyed.

Example:

class E {
    int x;
    class B { };
    class I {
        B b;                 // error: E::B is private
        int y;
        void f(E* p, int i) {
           p->x = i;         // error: E::x is private
        }
   };
   int g(I* p)
   {
       return p->y;          // error: I::y is private
   }
};

但是在 C++11 中,这段描述变更为:

$11.7/1 Nested classes [class.access.nest]

A nested class is a member and as such has the same access rights as any other member. The members of an enclosing class have no special access to members of a nested class; the usual access rules (Clause 11) shall be obeyed.

Example:

class E {
    int x;
    class B { };
    class I {
        B b;                  // OK: E::I can access E::B
        int y;
        void f(E* p, int i) {
            p->x = i;         // OK: E::I can access E::x
        }  
    }; 
    int g(I* p) {
        return p->y;          // error: I::y is private
    }  
};

从上面的描述和示例代码对比中我们可以明显看出,在旧标准中嵌套类和“被嵌套类”没有什么特殊的关系,就像两个普通类一样;但是在新标准中嵌套类已经完全视为“被嵌套类”的成员,那么自然也获得了“被嵌套类”成员应该有的访问控制权限。这也就意味着“被嵌套类”的普通成员拥有的访问“被友元类”私有成员变量的权限,嵌套类也能够获得,那么 Leveldb 在新版本的编译器下能够编译通过也不足为奇了。

不过 gcc3.4 的编译错误问题还不能单单归究于标准的变化。因为 gcc3.4 已经能够支持嵌套类访问“被嵌套类”的私有成员(因为在很早以前这就被确认为一个缺陷),只是不能够支持友元关系到嵌套类的传递。友元关系的传递可能是在 4.1 或者 4.2 版本中实现的,应该属于上述标准变化的衍生特性。

僵尸对象或 RAII

我最近在想这个问题,到底要不要在程序中使用异常?

以前写的 C 代码比较多,即使写 C++,基本上也是把它当成 C with object 来用。对异常的了解偏少,使用更是极少。最近评审别人代码的时候遇到一个问题:如果构造函数中 new 失败了,会发生什么事情?

工程的代码一般提倡哪里出错在哪里处理,不能恢复的要返回错误码给调用者。在一般情况下,使用 new(std::no_throw) 保证 new 不抛出异常(否则结果是灾难性的),并且检查分配是否成功是可以实现这一点的。

遗憾的是构造函数没有返回值,我们不能返回构造失败。那么只有用迂回的办法,为类定义一个成员变量 bool inited。初始化为 false,只有在构造的工作都完成之后,才将它置为 true。如果一个对象的 inited 成员为 false,就意味着它构造过程中出了问题,不能被使用。这就是一个僵尸对象,“活死人”。

看,我们成功地规避了使用异常。但是慢着,不是只有 bad_alloc 这一个异常啊!还有 bad_cast、runtime_error、logic_error,还有:

$ grep class /usr/include/c++/4.5/stdexcept 
// Standard exception classes  -*- C++ -*-
// ISO C++ 19.1  Exception classes
   *  program runs (e.g., violations of class invariants).
   *  @brief One of two subclasses of exception.
  class logic_error : public exception 
  class domain_error : public logic_error 
  class invalid_argument : public logic_error 
  class length_error : public logic_error 
  class out_of_range : public logic_error 
   *  @brief One of two subclasses of exception.
  class runtime_error : public exception 
  class range_error : public runtime_error 
  class overflow_error : public runtime_error 
  class underflow_error : public runtime_error 

天那,我未曾注意过标准库有那么多异常!那么如果在使用标准库时,不小心触发了什么异常,OMG!

这样看来,使用异常是很有必要的。但是,麻烦的问题又来了,一旦使用异常,函数的退出过程就变了。使用错误码有一个好处,就是你可以在函数返回前擦干净自己的屁股;但是使用异常呢?你既要保证对象能够自己擦屁股(RAII),还要保证函数能自己擦屁股(在正确的位置使用异常处理),这样才能在 stack unwinding 时不会导致内存泄露。哦,auto_ptr 可以帮上一些忙,但如果是分配的资源是数组呢?

还有一个麻烦是,你要遵从约定——特别是对于一个程序库作者来说。如果约定出错时抛出异常,那么可以抛;如果约定出错时返回错误码,或者这个库可能被 C 调用,那么抛出异常就可能是灾难。

现在看来,如果想实现更健壮的 C++ 程序,那么异常处理是不可或缺的。但在使用异常处理之前,必须得了解在哪里、怎样抛出和捕获异常,如果是团队合作,可能还需要有简单的操作指导手册,否则使用不当或者过量的异常也可能带来麻烦。

我还在路上!

Math in CS:置换的轮换分解

随便一本《近世代数》或者《抽象代数》书上在讲到置换群的时候,应该都会讲到这样一个定理:
任何一个置换都可以表示为不相交轮换的乘积,若不计因子的顺序,其分解式是唯一的。

一、简单解释

没有数学背景的人,这句话很难读懂,下面我们来看一个简单的例子。假设我们有这样一个置换 P:

1, 2, 3, 4, 5
2, 5, 4, 3, 1

那么这个置换是什么样的轮换的乘积呢?我们先从 1 出发,1 被换到 2,2 被换到 5,5 又被换到 1,这就是一个轮换;然后再从 3 出发,3 被换到 4,4 又被换到 3,这又是一个轮换。也就是说 P 是两个不相交轮换 (1, 2, 5) 和 (3,4) 的乘积。

二、一个应用:全排列判断问题

下面我们来看这个定理有什么作用,考虑下面这道题目[1][2]:

给一个 n 长的数组,判断它是否为一个 1, 2, ..., n 的全排列,要求在线性时间,常数空间内实现。

我们可以容易看到,每个全排列都可以视为 1, 2, ..., n 上的一个置换。问题就转化为检测该数组是不是一个 1, 2, ..., n 的置换。由本文开头提到的定理可知,我们只需要检查该置换是不是由不相交的轮换构成的即可。

还是上面那个例子,怎么检查

1, 2, 3, 4, 5
2, 5, 4, 3, 1

是不是一个置换呢?首先从 1 开始,a[1]=2,那么再检查 a[a[1]]=a[2]=5,然后再检查a[a[a[1]]]=a[5]=1,这样就发现了一个轮换 (1, 2, 5)。然后接下来检测第二个,第三个轮换...

如何保证检查的高效以及所有轮换都不相交呢?我们每次检查完一个数,就将它置负,这样遇到负值,循环就终止了。如果终止前检查的那个数与起始的数相同,那么我们就发现了一个轮换,否则它就不是一个轮换,说明 P 不是一个置换。由于检查过的轮换中的数字都被置为负值,所以第二个轮换肯定不会与第一个轮换相交。如果到最后所有的数都被置为负值,且循环正常终止,那么说明它们都在不相交的轮换里,那么 P 就是一个置换。

如果想要查找过程不影响最终数组的值,到最后把所有置负的元素都重新置正即可。

代码实现如下[2]:

/* We use a n+1 elements array a[n+1] for convenience. a[0] is used to store
 * the return value, thus is not part of the permutation.  */
int test_perm(int *a, int n)
{
  int i, j;
  if (a == NULL)  return 0;     /* Test input */
  a[0] = 1;
  for (i = 1; i <= n; ++i)      /* Test input */
    if (a[i] < 1 || a[i] > n) { /* Is a[i] in the range 1~n? */
      a[0] = 0;
      return a[0];
    }

  for (i = 1; i <= n; ++i)
    if (a[i] > 0) {
      j = i;
      while (a[j] > 0) {        /* Follow the cycle */
        a[j] = -a[j];
        j = -a[j];
      }
      if (j != i)  a[0] = 0;    /* Test the cycle */
    }

  for (i = 1; i <= n; ++i)
    a[i] = a[i] > 0 ? a[i] : -a[i];

  return a[0];
}

三、另一个应用:100 囚徒碰运气问题

那么这个定理还有其它的用处没有呢?考虑下面这道题目[3][4]:

100 个囚犯,每人有一个从 1 到 100 的不重复不遗漏的号码,国王把这些号码收集起来,打乱放进 100 个箱子里,每个箱子里有且仅有一个号码。囚犯们一个一个地来到 100 个箱子面前,每人可以打开至多 50 个箱子来寻找自己的号码,可以一个一个打开(即可以根据之前箱子里看到的号码来决定后面要打开的箱子)。如果有一个囚犯没有找到自己的号码,那么这 100 个人一起被处死;只有当所有的囚犯都找到了自己的号码,他们才会被国王全部释放。

囚犯们可以在没开箱子前商量对策,但是一但打开了箱子,他就不能告诉别人箱子和号码的对应关系。问他们应该用什么样的策略以保证最大的存活概率?

显然,每个人随机选 50 个箱子打开,100 个人的存活概率会是 1/2 的 100 次方,即1/1267650600228229401496703205376,可以小到忽略不计。但是事实上有一种极简单的办法,其存活概率高达 30% 。至于有没有更好的办法?我不知道。

存活率达 30% 的策略就是:

囚犯打开自己号码对应的箱子,就按照箱子中的号码打开另一个箱子,一直到找到自己号码或者选50 次为止,这样就能保证整体有 30% 的存活概率。

这个策略背后的数学原理是什么呢?其实国王所作的事情,就是一个 1 到 100 元素集合的置换,囚犯所做的事情,就是顺着自己号码所在的轮换找自己号码。那么什么时候所有人都不用死呢?就是这个置换中所有的轮换长度都不大于 50,因为每个囚犯号码的轮换都不大于 50,那么他总能在 50 次以内找到自己的号码。

怎么计算这个概率 P 呢?{这个置换中所有的轮换长度都不大于 50 的概率},就是 1 - {存在轮换长度大于 50 的概率},进而 1 - {存在轮换长度为 51, 52, ..., 100 的概率},由此,我们可以得到下面的等式:

P=1-\frac{1}{100!}\sum_{k=51}^{100}\binom{100}{k}(k-1)!(100-k)!=1-\sum_{k=51}^{100}%20\frac{1}{k}=1-(H_{100}-H_{50})

其中,Hn 代表调和数(Harmonic Number)。虽然调和数没有精确的公式,但是我们知道调和数和自然对数有着密切的联系[5],那么我们就可以用自然对数来近似:

P\approx1-(ln(100)-ln(50))=1-ln(2)\approx0.30685281944005469059[6]

因此,我们可以得到,使用这种策略 100个囚犯的存活概率 P 约为 30%。

[1] http://yueweitang.org/bbs/topic/22
[2] http://fayaa.com/tiku/view/84/
[3] http://tydsh.spaces.live.com/Blog/cns!435F1A315756AD5D!833.entry
[4] http://fayaa.com/tiku/view/141/
[5] http://en.wikipedia.org/wiki/Harmonic_number#Calculation
[6] 求和得到的更精确的结果是:0.31182782068980479698,Bash 代码:

STR="1-("
for i in `seq 51 99`; do
  STR+="1/$i+"
done
STR+="1/100)"
echo $STR | bc -l

Cygwin GCC qsort 函数错误(续)

上一篇文章中提到我在为 qsort 写 compare 函数时犯了一个愚蠢的错误:我脑袋陷入了一个错误的逻辑,以为 compare 函数嘛,就是要 compare 一下,那么我用 '>' 或者 '< ' 这种比较算符就可以满足要求(潜意识里认为 > 会返回 1 或者 -1,显然是错的,上篇文章的评论者 Stephen 开始也犯了同样的直觉错误,不过他马上就醒悟过来了)。我当时脑袋里也犹豫了一下要不要处理相等的情况,后来想快排算法中没有判断相等的情况,那么我没必要加上等号。

这个错误直接导致了快排算法失效。

但是为什么在 Linux 下的 gcc 可以输出正确的排序结果呢?我想了很久,最终还是把 glibc 的代码看了一下,才发现,原来当数组规模比较小时时(数组大小小于物理内存的四分之一),glibc 的 qsort 其实不使用 quick sort(_quicksort),而是使用 merge sort(msort_with_tmp)。而且在 msort_with_tmp 中,对 compare 的处理是比较其返回值是否 <=0,这样排序的结果就是正确的了。[1]

事实上最简单的快排算法是只使用 '<' 号或者 '<='的,比如 Wikipedia 上给出的快排算法,那么我们的 compare 只返回 -1 和 0 行吗?这取决于实现,比如对快排算法的优化中有一个就是对数组中有大量相等元素情况下的优化,其中一种实现 Three-way partition, 就需要使用到三种情况:大于、小于或等于。原始的快排 partition 是将数组按照与 pivot 的比较分为两段,Three-way partition 则是将数组分为三段,中间增加一段与 pivot 值相等的子数组。C 玩具代码的实现如下:

void qsort_3way(int a[], int lo, int hi)
{
  if (hi <= lo) return;
  int lt = lo, gt = hi, i = lt;
  int v = a[lo], t;
  while (i <= gt) {
    if (a[i] < v) {
      t = a[i]; a[i] = a[lt]; a[lt] = t;
      ++i; ++lt;
    } else if (a[i] > v) {
      t = a[i]; a[i] = a[gt]; a[gt] = t;
      --gt;  
    } else i++;
  }
  qsort_3way(a, lo, lt - 1);
  qsort_3way(a, gt + 1, hi);
}

但是 '<' 和 '>' 真的都需要吗?理论上来讲,'>' 是不需要的,我们显然可以将 a[i] > v 改成 v < a[i]。这也是 C++ 里面做的,C++ 中的 sort 函数只需要类重载 '< ' 运算符。但是 C 中并没有这种约定,我们不能预设 qsort 如何拿 compare() 的返回值与 0 比较。因此让 compare() 按照 C 的约定,返回大于、小于和等于 0 的三种情况是绝对正确的而且必要的。

我了解了正确的结果怎么得来的,但是我仍然不知道错误的结果是怎么得来的。看起来 Cygwin 使用的 libc 中没有采取类似 Linux 下 gcc 的策略(比如无法取到物理内存大小?)。quick sort 算法有很多优化的技巧和实现:有的使用 '< ' 符号比较,有的在分支数组足够小时采用插入排序,有的同时使用 '<', '> 两个符号,有的随机取 pivot,有的取三点中值作为 pivot。[2] 没有看到代码和调试,很难判断 Cygwin 的 libc 使用了什么算法(当然,尝试分析不同的输入输出是可以得到规律的,比密码分析还是要简单一些)。

[1] glibc/stdlib/msort.c.
[2] Jon Bentley and M. Douglas McIlroy, "Engineering a sort function", Software - Practice and Experience, Vol. 23 (11), 1249-1265, 1993.

Cygwin GCC qsort 函数错误

我平时在 Windows 下写代码时,经常使用 Cygwin 的 gcc。但是今天我居然发现 Cygwin 下 gcc 的 qsort 函数是错误的!这种基本的函数出错,太让人惊讶了。为了验证是不是代码有错,我使用 tcc 和 Linux 下的 gcc 都编译了同样一段程序,它们两个都输出了期望的结果,只有 Cygwin 的 gcc 是错的。下面是示例代码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

int compare(const void *p, const void *q)
{
  return *(const char *)p > *(const char *)q;
}

int main()
{
  char a[] = "1312515";
  printf("%sn", a);
  qsort(a, strlen(a), sizeof(char), compare);
  printf("%sn", a);
  return 0;
}

按说它应该输出:

1312515
1112355

但是我用 Cygwin gcc 编译后,它居然运行出这样的结果:

1312515
2111355

太诡异了。我尝试调试它,结果 gdb 无法步入 qsort 代码中。谁能告诉我是为什么?

附 Cygwin gcc 信息:

$ gcc -v
Using built-in specs.
Target: i686-pc-cygwin
Configured with: /gnu/gcc/package/gcc4-4.3.2-2/src/gcc-4.3.2/configure --srcdir=/gnu/gcc/package/gcc4-4.3.2-2/src/gcc-4.3.2 --prefix=/usr --exec-prefix=/usr --bindir=/usr/bin --sbindir=/usr/sbin --libexecdir=/usr/sbin --datadir=/usr/share --localstatedir=/var --sysconfdir=/etc --infodir=/usr/share/info --mandir=/usr/share/man --datadir=/usr/share --infodir=/usr/share/info --mandir=/usr/share/man -v --with-gmp=/usr --with-mpfr=/usr --enable-bootstrap --enable-version-specific-runtime-libs --with-slibdir=/usr/bin --libexecdir=/usr/lib --enable-static --enable-shared --enable-shared-libgcc --enable-__cxa_atexit --with-gnu-ld --with-gnu-as --with-dwarf2 --disable-sjlj-exceptions --enable-languages=ada,c,c++,fortran,java,objc,obj-c++ --disable-symvers --enable-libjava --program-suffix=-4 --enable-libgomp --enable-libssp --enable-libada --enable-threads=posix AS=/opt/gcc-tools/bin/as.exe AS_FOR_TARGET=/opt/gcc-tools/bin/as.exe LD=/opt/gcc-tools/bin/ld.exe LD_FOR_TARGET=/opt/gcc-tools/bin/ld.exe
Thread model: posix
gcc version 4.3.2 20080827 (beta) 2 (GCC)

我犯了一个愚蠢的错误,感谢来自 Stephen 的评论

你的compare函数有问题,你的compare函数不会返回负数。修改compare为:
int compare(const void *p, const void *q)
{
return *(const char *)p - *(const char *)q;
}
再编译运行就正确了。

将文本文件读入数组-C语言实现

要求:使用 C 语言将文本文件的每一行读入为数组的一个元素,返回一个 char ** 指针。

由于行长度和文本文件行数均未知,相当于二维 char 数组的两维长度都未定义。由于 getline 函数可以自动扩充 char 数组长度,我最初的想法是使用 getline 得到每行,然后每次对 char ** 进行 realloc,直到读完整个文件。

但是这种做法并不好,首先 getline 是 glibc 的扩展,而不是 C 语言的标准函数,使用除 gcc 以外的编译器是不一定能编译通过的;其次,每次对 char ** 指针进行 realloc 显得代码很 ugly。可以使用 fgets 替代 getline,但是就要自己来控制一维 char 数组的长度。

后来想想,换了一种思路,首先将整个文件读入内存,然后根据 '\n' 的个数来计算文件的行数,作为二维数组的长度,然后将所有的 '\n' 替换成 '\0',并将每一行的指针赋给二维 char 数组,代码如下:

char ** text_2_array(const char *filename)
{
  char *p, **array;
  int lines;
  if(filename == NULL) return NULL;

  FILE *fp = fopen(filename, "r");
  if(fp == NULL) return NULL;

  /* Get file size. */
  fseek(fp, 0L, SEEK_END);
  long int f_size = ftell(fp);
  fseek(fp, 0L, SEEK_SET);

  /* Allocate space for file content. */
  char *buf = (char *) calloc(f_size, sizeof(char));
  if(buf == NULL) return NULL;

  fread(buf, sizeof(char), f_size, fp);
  fclose(fp);

  /* Get number of lines. */
  for(p=strchr(buf, '\n'), lines=1; p!=NULL; p=strchr(p, '\n'), lines++) {
    if(*p == '\n') p++;
  }

  /* Allocate space for array; split file buffer to lines by change '\n' to
     '\0'. */
  array = (char **) calloc(lines+1, sizeof(char*));
  array[0] = buf;
  for(p=strchr(buf, '\n'), lines=1; p!=NULL; p=strchr(p, '\n')) {
    if(*p == '\n') *p++ = '\0';
    if(p != NULL) array[lines++] = p;
  }
  /* Add a terminate NULL pointer. */
  array[lines] = NULL;
  return array;
}

其实读文本文件入数组这个功能在很多语言中是很简单的操作,比如 PHP 的 file 函数,或者 Bash 的 (`cat filename`),都可以直接实现这个功能。但是对 C 这种更低级的语言来说,貌似就没那么简单了。我想要了解的是,除了我上面提到的两种思路,有没有更简单或者直接的方法来解决这个问题?比如一些我不熟悉的函数,或者一些 trick。

vasprintf 会将空间分配到栈上吗?

由于提交过几次 Linux Fetion 的 bug 和 patch,Linux Fetion 的开发者邀请我加入了 Linux Fetion GUI 的维护者团队中。

昨天晚上和今天下午,我和邓东东(DDD)一直在调试一个 Linux Fetion 在 64 位电脑上的段错误 BUG。这是一个非常奇怪的 BUG,其表现为在 64 位电脑上(Ubuntu 9.04)运行 Linux Fetion 在登录成功后会经常出现 Segmentation Fault。DDD 确定该 BUG 存在于 Libfetion 库中,并且和读取联系人信息的函数有关。Libfetion 论坛上一直有人抱怨类似问题,但是在 DDD 的 64 位虚拟机上却无法重现此 BUG(他的 libc 是 2.7 版本的)。

由于 DDD 仍然不愿意公开 libfetion 库的源代码,我只好等每次他修改库文件之后发给我再调试。经过了好几个小时的努力,今天下午我发现,该 BUG 的主要成因非常有可能是:子函数中本应被动态分配到堆(heap)上的空间被分配(或误写)到了栈(stack)上,子函数返回调用者之后指向子函数栈内容的指针非法。

由于该动态分配的空间是使用 vasprintf 自动分配的,从 DDD 给我的部分代码来看指针传递出问题的可能性不大。那么我想,是不是 vasprintf 函数并不能保证动态分配的空间在 heap 上呢?希望对此有了解的朋友指点一下,谢谢!

$ uname -a
Linux Slytherin 2.6.28-12-generic #43-Ubuntu SMP Fri May 1 19:31:32 UTC 2009 x86_64 GNU/Linux
$ gcc -v
Using built-in specs.
Target: x86_64-linux-gnu
Configured with: ../src/configure -v --with-pkgversion='Ubuntu 4.3.3-5ubuntu4' --with-bugurl=file:///usr/share/doc/gcc-4.3/README.Bugs --enable-languages=c,c++,fortran,objc,obj-c++ --prefix=/usr --enable-shared --with-system-zlib --libexecdir=/usr/lib --without-included-gettext --enable-threads=posix --enable-nls --with-gxx-include-dir=/usr/include/c++/4.3 --program-suffix=-4.3 --enable-clocale=gnu --enable-libstdcxx-debug --enable-objc-gc --enable-mpfr --with-tune=generic --enable-checking=release --build=x86_64-linux-gnu --host=x86_64-linux-gnu --target=x86_64-linux-gnu
Thread model: posix
gcc version 4.3.3 (Ubuntu 4.3.3-5ubuntu4)
$ ll /lib/libc.so.6
lrwxrwxrwx 1 root root 11 2009-04-13 10:26 /lib/libc.so.6 -> libc-2.9.so

PS:我在写这篇博文过程中搜索了一下 Wikipedia,发现这样一段话:

int asprintf(char **ret, const char *format, ...)

asprintf automatically allocates enough memory to hold the final string. It sets *ret to a pointer to the resulting string, or to an undefined value if an error occurred (GLibc is notable in being the only implementation that doesn't always set *ret to NULL on error).

那么是不是分配失败导致了错误的发生呢?但是如果分配失败,为什么子函数返回前的指针的确指向一段在栈上的字符串呢?

多余的逗号?

晚上看了两页 The Art of Unix Programming,其中提到了一个我以前一直感觉困惑的地方:

在我看过的 C/C++ 语言程序代码中,为什么有的列表初始化时在最后元素后会加逗号“,”,而有的不会?
例如:int[] a = { 1, 2, 3, };

书中的原话倒不是讨论逗号该不该加,而是说到了这样做能带来的好处:

A good example is C accommodating an extra comma at the end of an array initializer list, which makes both editing and machine generation of array initializers much easier.
-- The Art of Unix Programming (TAOUP) Ch8.3.1

哦,虽然我一直体会到这样做的好处(尤其当列表成员又臭又长且要经常修改时),也晓得这样做不会引起编译错误,但我经常是在代码 stable 之后将最后的逗号去掉——原因无它,不确定这样做是不是没有问题,那么还是尽量避免吧。今天忽然看到 TAOUP 提到这个,我就好奇:到底是 C/C++ 标准允许这样做呢?还是编译器的实现大部分支持这样做?于是就查了一下。

结果让我很开心,C/C++ 标准中就允许这样做:

initializer:
    assignment-expression
    { initializer-list }
    { initializer-list , }

-- ISO/IEC 9899:1999 (C99) Ch6.7.8 §1

initializer-clause:
    assignment-expression
    { initializer-list ,opt }
    { }

-- ISO/IEC 14882:1998 (C++98) Ch8.5 §1

K&R 中也用非常简短的一句话提到了这个特性:

A list may end with a comma, a nicety for neat formatting.
-- The C Programming Language (K&R) Appendix 8.7

这意味着(C/C++ 语言中)在元素列表最后加上一个逗号是一件非常安全的事情,看来我以后不必再考虑删除列表最后那个逗号了,这样能省却我很多麻烦。

延伸阅读:在其它编程语言中,是否支持这样做呢?Arrays: additionnal commas 这篇文章进行了一个很有意思的讨论。

Something About Scanf() In C (1)

一. scanf 函数输入格式中的字符串

scanf 函数输入格式中也可以含有普通字符串,但他的含义是这些字符必须在输入中出现,例如:

int num;
scanf("hello %d", &num);

他的含义是首先要求输入一个hello字符串,然后再输入一个十进制数。 注意在等待输入时忽略hello与要输入的数之间的空格,制表符,回车。

因此这两种输入都是正确的: hello 1234 hello1234

二. scanf函数的返回值

程序:

{
  int num, result=0;
  printf("please input the student’s score: ");
  while(result==0) {
    /* 清空输入缓冲区。 */
    fflush(stdin);
    if(result!=1) printf("Please input a digital score: ");
    result=scanf("%d",&num);
  }
}

一切OK!

三. scanf函数中一个参数的应用

在 scanf 函数中,我们可以使用 %c 来读取一个字符,使用 %s 读取一个字符串。 但是读取字符串时不忽略空格,读字符串时忽略开始的空格,并且读到空格为止,因此我们只能读取一个单词,而不是整行字符串。因此一般使用 fgets 来读取一个字符串。

其实 scanf 函数也可完成这样的功能,而且还更强大。 这里主要介绍一个参数:%[] ,这个参数的意义是读入一个字符集合。 [] 是个集合的标志,因此 %[] 特指读入此集合所限定的那些字符, 比如 %[A-Z] 是输入大写字母,一旦遇到不在此集合的字符便停止。 如果集合的第一个字符是"^", 这说明读取不在 "^" 后面集合的字符,既遇到 "^" 后面集合的字符便停止。注意此时读入的字符串是可以含有空格的。 Eg: 输入一个字符串, 这个字符串只含有小写字符。遇到第一个不是小写字符时停止, scanf("%[a-z], str); Eg: 想输入一个字符串, 遇到 "." 停止,可设计如下: scanf("%[^.]", str); 使用这个参数,你可以完成许多强大的功能呦!

通常来讲,scanf 函数和他的一些参数并不是很常用,主要是因为:

1. 许多系统的 scanf 函数都有漏洞。 (典型的就是TC再输入浮点型时有时会出错)。

2. 用法复杂,容易出错。

3. 编译器作语法分析时会很困难,从而影响目标代码的质量和执行效率。

About "double" in C

昨天发现一个很有趣的现象,在 Turbo C 里 double 类型的变量无法用通常模式进行输入操作,即无法用 scanf() 进行赋值,程序举例:

void main()
{
  double a,b,c;
  scanf("%f%f%f",&a,&b,&c);
  printf("%f%f%f",a,b,c);
}

输出结果均是 0.000000,猜想 C 语言应该没有默认初始值的功能,之所以是 0.000000 可能是保留六位小数的原因,由于数值较小且跟地址有关,输入的数值没有传入到地址中。如果将 double 型改为 float 型,则能正常操作。是何道理?难道 C 里输入函数对 double 不兼容?还是另有其他输入方法?回去要查一下 MSDN,不知道 C 有没有专门的帮助文档。

还有要注意的是 TC2 和 TC3 之间对程序要求的变化,在 TC3 中如果不包含标准输入输出的头文件 stdio.h,程序中使用 scanf 和 printf 会报错,而且主函数也必须声明类型,不然会有警告。难道是 ANSI C 的标准变了?还是 TC 为了完善自己只适用于 TC?看来还是要注意编译软件的兼容性问题。 C 语言在好多地方还是不如 C++ 呀!

PS: 输出结果是 0.000000 的原因是因为 scanf() 如果取不到值就会把变量赋零。

2009年6月11日15:40
只因当时年纪小呀,这篇博客现在让我看都快要笑死了。为了不误导读者,特更正如下:
1. 之所以读取错误,是因为本文中程序是错的,%f 匹配浮点类型,%lf 才能匹配 double 类型,这不是 scanf 的错。
2. TC3 对程序的检查是正确的,C 标准确实有变化,我当时是受一本错误的 C 语言教科书误导。
3. C 虽然很多地方不如 C++,但是 C 有简洁性的长处,C++ 是不能比的。