JVM管理的内存可以总体划分为两部分:Heap Memory和Native Memory。前者我们比较熟悉,是供Java应用程序使用的;后者也称为C-Heap,是供JVM自身进程使用的。Heap Memory及其内部各组成的大小可以通过JVM的一系列命令行参数来控制,在此不赘述。Native Memory没有相应的参数来控制大小,其大小依赖于操作系统进程的最大值(对于32位系统就是3~4G,各种系统的实现并不一样),以及生成的Java字节码大小、创建的线程数量、维持java对象的状态信息大小(用于GC)以及一些第三方的包,比如JDBC驱动使用的native内存。

Native Memory里存些什么?

  1. 管理java heap的状态数据(用于GC);
  2. JNI调用,也就是Native Stack;
  3. JIT(即使编译器)编译时使用Native Memory,并且JIT的输入(Java字节码)和输出(可执行代码)也都是保存在Native Memory;
  4. NIO direct buffer。对于IBM JVM和Hotspot,都可以通过-XX:MaxDirectMemorySize来设置nio直接缓冲区的最大值。默认是64M。超过这个时,会按照32M自动增大。
  5. 对于IBM的JVM某些版本实现,类加载器和类信息都是保存在Native Memory中的。

DirectBuffer的好处

DirectBuffer访问更快,避免了从HeapBuffer还需要从java堆拷贝到本地堆,操作系统直接访问的是DirectBuffer。DirectBuffer对象的数据实际是保存在native heap中,但是引用保存在HeapBuffer中。
另外,DirectBuffer的引用是直接分配在堆得Old区的,因此其回收时机是在FullGC时。因此,需要避免频繁的分配DirectBuffer,这样很容易导致Native Memory溢出。

为什么会内存溢出?

简单理解java process memory = java heap + native memory。因此内存溢出时,首先要区分是堆内存溢出还是本地内存溢出。Native Memory本质上就是因为耗尽了进程地址空间。对于HotSpot JVM来书,不断的分配直接内存,会导致如下错误信息:Allocated 1953546760 bytes of native memory before running out

参考资料:

http://www.ibm.com/developerworks/library/j-nativememory-linux/index.html
http://www.techpaste.com/2012/07/steps-debugdiagnose-memory-memory-leaks-jvm/
https://sourcevirtues.wordpress.com/2013/01/14/java-heap-space-and-native-heap-problems/
http://www.theotherian.com/2013/08/understanding-javas-native-heap-or-c-heap.html
http://www.ibm.com/developerworks/library/l-kernel-memory-access/
http://www.ibm.com/developerworks/library/j-zerocopy/
http://en.wikipedia.org/wiki/Direct_memory_access

Spymemcached是一个Memcached(一个高性能内存对象缓存系统)的开源客户端程序。其他MC客户端还有xmemcached、Memcached Client for Java等。我们使用MC客户端程序与MC服务端通信,实现set、get缓存的操作,MC支持的协议类型包括:ASCII 文本协议和二进制协议。

Spymemcached使用Java NIO来实现,并且是单线程的,也就是说在Spymemcached中只有一个主循环线程来处理各种IO事件(Connection、Read、Write)。对于业务线程来说,调用Spymemcached提供的API即可实现对服务端缓存的增、删、查、改操作,很是便利。处理业务线程操作的具体过程如下:

  1. Spymemcached将每一个业务线程的操作都封装为一个Operation对象,然后放入内部的inputQueue队列中。
  2. 在放入inputQueue的时候,调用selector.wakup()方法唤醒内部IO线程。
  3. IO线程被唤醒后,会将inputQueue中所有的Operation拷贝到writeQueue中去,并且会立马对writeQueue中的Operation做IO写操作,而且会注册读事件,如果必要(还有可写的Operation或者写buf中还有数据)还会注册写事件。
  4. 一旦MC服务端有数据返回,就会触发读事件,然后客户端就会返回缓存对象给业务线程。

下图描述了上述过程:

背景知识

c3p0是一个开源的、基于Java JDBC 规范的连接池管理框架。
官网地址:[http://www.mchange.com/projects/c3p0/]

获取连接的过程

  1. c3p0构造了一个Connection对象池。
  2. 在对象池中有空闲对象时或者没有达到对象池最大数量时,获取Connection都会成功返回。但是要注意,获取到的Connection不一定是可用的(比如服务端MySQL进程挂起、或者服务器掉电等极端情况下)。
  3. 由于空闲的Connetion不一定是可用的,所以c3p0会启用一个检查线程池来检查空闲对象的可用性。检查时,会将该空闲对象加入一个idleCheckResources(Set类型);检查完成后,再做出删除操作。其实idleCheckResources就是unused(LinkedList)的一个浅层拷贝。空闲连接检查就是依靠执行select 1 sql来验证。
  4. 在我们获取Connection对象后,会检查获取的Connection是否在idleCheckResources中,如果在则等待mytemplate.pool.timeout时间后,递归调用本方法。
  5. 由于第四步的检查机制,导致一个严重的问题。假设对象池中现在有3个连接,而且都是空闲的。并且这个时候三个检查线程都阻塞在select 1了(比如数据库挂起、或者断电、网络延时),需要说明的是这种阻塞是无限期的,即使在执行statement设置了超时时间也是一直阻塞,何况c3p0源码中并没有设置超时时间。那么这个时候,业务线程已经拿到了其中一个对象,那么进行第4步的检查时,就会导致无限递归方法的出现(这种情况就类似自旋锁的死锁问题了),阻塞住getConnection,最终栈溢出。

下图描述了这一个过程:

那么为什么对于设置了statement的超时时间没有效果呢?这是因为MySQL的statement超时机制决定的,其机制如下图:

关键是第8步,同样是要发送sql来取消执行,所以这种时候取消sql也会被阻塞住。根本原因是阻塞在了socket.read()上了,所以发送的取消sql同样会面临阻塞的问题。statement超时一般是为了控制数据库正常时候sql的超时执行的。对于数据库异常时候的超时,应该使用mysql的socket超时。mysql的socket超时只能在启动参数中配置,没有api来提供配置功能。或者还自己来实现异步的检测机制。

JDBC相关的超时机制详解可以参考:http://www.cubrid.org/blog/dev-platform/understanding-jdbc-internals-and-timeout-configuration/

问题背景

OP_WRITE事件是在Socket发送缓冲区中的可用字节数大于或等于其低水位标记SO_SNDLOWAT时发生。正常情况下,都是可写的,因此一般不注册写事件。所以一般代码如下:

1
2
3
4
5
6
while (bb.hasRemaining()) {
int len = socketChannel.write(bb);
if (len < 0) {
throw new EOFException();
}
}

这样在大部分情况都没问题,但是高并发,并且在网络环境很差的情况下,发送缓冲区可能会满,导致无限循环,这样最终会导致CPU利用率100%。下面就看看一些基于NIO的框架,是如何处理这个问题的。

Spymemcached的处理方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
  private void handleWrites(SelectionKey sk, MemcachedNode qa)
throws IOException {

// 填充写缓冲区
qa.fillWriteBuffer(shouldOptimize);
boolean canWriteMore = qa.getBytesRemainingToWrite() > 0;
while (canWriteMore) {
int wrote = qa.writeSome();
qa.fillWriteBuffer(shouldOptimize);
// 如果wrote等于零,表示没有写出数据,那么不再尝试写,等待下次线程外层循环注册write事件
canWriteMore = wrote > 0 && qa.getBytesRemainingToWrite() > 0;
}

public final int writeSome() throws IOException {
int wrote = channel.write(wbuf);
// 写入多少个字节,toWrite就减去对应的数量
toWrite -= wrote;
return wrote;
}

public final int getSelectionOps() {
int rv = 0;
if (getChannel().isConnected()) {
if (hasReadOp()) {
rv |= SelectionKey.OP_READ;
}
// 如果toWrite大于0,说明由于某种异常原因上次写入还未完成;hasWriteOp()用于判断写队列是否还有元素。这两种情况下,需要注册写事件。本文讨论的是toWrite>0的情况。
if (toWrite > 0 || hasWriteOp()) {
rv |= SelectionKey.OP_WRITE;
}
} else {
rv = SelectionKey.OP_CONNECT;
}
return rv;
}

说明:Spymemcached是单线程的,因此就是绝对不能阻塞,所以当发现不可写的时候,不能阻塞住线程,而是立即返回,等待下次主线程循环来注册事件。

Netty的处理方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
protected void write0(AbstractNioChannel<?> channel) {
boolean open = true;
boolean addOpWrite = false;
boolean removeOpWrite = false;
boolean iothread = isIoThread(channel);

long writtenBytes = 0;

final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
final WritableByteChannel ch = channel.channel;
final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
final int writeSpinCount = channel.getConfig().getWriteSpinCount();
List<Throwable> causes = null;

synchronized (channel.writeLock) {
channel.inWriteNowLoop = true;
for (;;) {
MessageEvent evt = channel.currentWriteEvent;
SendBuffer buf = null;
ChannelFuture future = null;
try {
if (evt == null) {
if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
// 如果无数据可写,则需要删除可写事件的注册
removeOpWrite = true;
channel.writeSuspended = false;
break;
}
future = evt.getFuture();

channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
} else {
future = evt.getFuture();
buf = channel.currentWriteBuffer;
}

long localWrittenBytes = 0;
// 通过writeSpinCount来控制尝试写的次数,如果最终还是无法写入,就注册写事件
for (int i = writeSpinCount; i > 0; i --) {
// 写数据
localWrittenBytes = buf.transferTo(ch);
// 如果写入数据不等于零,表明写入成功,跳出循环
if (localWrittenBytes != 0) {
writtenBytes += localWrittenBytes;
break;
}
// 如果buf的数据都写完了,则跳出循环
if (buf.finished()) {
break;
}
}

if (buf.finished()) {
// Successful write - proceed to the next message.
buf.release();
channel.currentWriteEvent = null;
channel.currentWriteBuffer = null;
// Mark the event object for garbage collection.
//noinspection UnusedAssignment
evt = null;
buf = null;
future.setSuccess();
} else {
// Not written fully - perhaps the kernel buffer is full.
addOpWrite = true;
channel.writeSuspended = true;

if (writtenBytes > 0) {
// Notify progress listeners if necessary.
future.setProgress(
localWrittenBytes,
buf.writtenBytes(), buf.totalBytes());
}
break;
}
}
}
channel.inWriteNowLoop = false;

if (open) {
if (addOpWrite) {
// 注册写事件
setOpWrite(channel);
} else if (removeOpWrite) {
// 删除写事件
clearOpWrite(channel);
}
}
}
}

说明:Netty是多线程的,因此其可以通过阻塞线程做一定的等待,等待通道可写。Netty等待是通过spinCount等待指定的循环次数。

Grizzly(诞生子Glass Fish项目)的处理方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static long flushChannel(SocketChannel socketChannel, ByteBuffer bb, long writeTimeout)
throws IOException {

SelectionKey key = null;
Selector writeSelector = null;
int attempts = 0;
int bytesProduced = 0;
try {
while (bb.hasRemaining()) {
int len = socketChannel.write(bb);
// 类似Netty的spinCount
attempts++;
if (len < 0) {
throw new EOFException();
}
bytesProduced += len;
if (len == 0) {
if (writeSelector == null) {
// 获取一个新的selector
writeSelector = SelectorFactory.getSelector();
if (writeSelector == null) {
// Continue using the main one
continue;
}
}
// 在新selector上注册写事件,而不是在主selector上注册
key = socketChannel.register(writeSelector, key.OP_WRITE);
// 利用writeSelector.select()来阻塞当前线程,等待可写事件发生,总共等待可写事件的时长是3*writeTimeout
if (writeSelector.select(writeTimeout) == 0) {
if (attempts > 2)
throw new IOException("Client disconnected");
} else {
attempts--;
}
} else {
attempts = 0;
}
}
}
return bytesProduced;
}

说明:Grizzly是多线程的,因此其可以做合适的阻塞等待。其没有再主selector上注册写事件,而是在重新构造的selector上注册写事件,并且通过select()来阻塞一定的时间来等待可写。

为什么要这么做呢?Grizzly的作者对此的回应如下:

  1. 使用临时的Selector的目的是减少线程间的切换。当前的Selector一般用来处理OP_ACCEPT,和OP_READ的操作。使用临时的Selector可减轻主Selector的负担;而在注册的时候则需要进行线程切换,会引起不必要的系统调用。这种方式避免了线程之间的频繁切换,有利于系统的性能提高。
  2. 虽然writeSelector.select(writeTimeout)做了阻塞操作,但是这种情况只是少数极端的环境下才会发生。> 大多数的客户端是不会频繁出现这种现象的,因此在同一时刻被阻塞的线程不会很多。
  3. 利用这个阻塞操作来判断异常中断的客户连接。
  4. 经过压力实验证明这种实现的性能是非常好的。

信号的定义和分类

信号是软件中断,提供了典型的异步机制。每个信号有一个编号,信号分为两类:非实时信号和实时信号。0-31编号属于非实时信号;31-63编号属于实时信号。为什么会分为这两类信号呢?这个主要是因为历史原因,首先实现的是非实时信号,非实时信号也成为不可靠信号,是因为其实现机制导致这类信号可能会丢失;而实时信号,由于存在排队机制,所以不会丢失。关于这点会在信号的处理过程图示中详细阐述。

信号的来源

信号事件的发生有两个来源:硬件来源(比如我们按下了键盘或者其它硬件故障);软件来源,最常用发送信号的系统函数是kill, raise, alarm和setitimer以及sigqueue函数,软件来源还包括一些非法运算等操作。

信号的处理时机

每个信号都可以被关联1个信号处理函数,如果没有关联,其处理动作是系统默认的,大部分都是动作都是忽略,具体每个信号的默认处理动作可以谷歌查询。在目标进程执行过程中,会检测是否有信号等待处理(每次从系统空间返回到用户空间时都做这样的检查),如果有,则调用信号处理函数。

信号的状态

实际执行信号的处理动作称为信号递达,信号从产生到递达之间的状态成为未决。进程既可以忽略信号,也可以阻塞信号。阻塞的意思是信号不会被递达,而忽略是信号递达之后的一个可选动作。信号在内核中的表示包括两个标记位:阻塞标记位和未决标记位以及信号处理函数的指针。

信号的处理过程

  1. 如果存在未决信号等待处理且该信号没有被进程阻塞,则在运行相应的信号处理函数前,进程会把信号在未决信号链中占有的结构卸掉。是否将信号从进程未决信号集中删除对于实时与非实时信号是不同的。
  2. 对于非实时信号来说,由于在未决信号信息链中最多只占用一个sigqueue结构,因此该结构被释放后,应该把信号在进程未决信号集中删除(信号注销完毕)。
  3. 对于实时信号来说,可能在未决信号信息链中占用多个sigqueue结构,因此应该针对占用sigqueue结构的数目区别对待:如果只占用一个sigqueue结构(进程只收到该信号一次),则应该把信号在进程的未决信号集中删除(信号注销完毕)。否则,不应该在进程的未决信号集中删除该信号(信号注销完毕)。

下图就是我理解的信号处理过程: