进程(process) 与 线程(thread) 的区别:每个进程都拥有自己的一整套变量,线程则共享数据;线程更“轻量级”,创建、撤销一个线程比启动一个进程的开销小很多。
1. 什么是线程
实例:设计一个转账程序,同时在银行账户 0、1 与 2、3 之间转账。
在线程中执行任务的过程:
- 将执行任务的代码放在一个类的
run()中,该类需要实现Runnable接口。
public interface Runnable {
void run();
}Runnable 是一个函数式接口,可以用一个 lambda 创建实例:
Runnable r = () -> {
// 执行任务
};- 从该实例构造一个
Thread对象:
Thread t = new Thread(r);- 启动该
Thread对象:
t.start();Runnable r = () -> {
try {
for (int i = 0; i < STEPS; i++) {
double amount = MAX_AMOUNT * Math.random();
bank.transfer(0, 1, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}
catch (InterruptedException e) {
}
};
Thread t = new Thread(r);
t.start();要捕获 sleep() 可能抛出的 InterruptedException,中断用于请求终止一个线程,出现 InterruptedException() 时 run() 将退出。
该程序还将启动第二个线程,在账户 2、3 之间转账,运行时发现两对账户之间的转账信息交错输出,说明它们在并发运行。
class ThreadTest {
public static final int DELAY = 10;
public static final int STEPS = 100;
public static final double MAX_AMOUNT = 1000;
public static void main(String[] args) {
Bank bank = new Bank(4, 100000);
Runnable task1 = () -> {
try {
for (int i = 0; i < STEPS; i++) {
double amount = MAX_AMOUNT * Math.random();
bank.transfer(0, 1, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}
catch (InterruptedException e) {
}
};
Runnable task2 = () -> {
try {
for (int i = 0; i < STEPS; i++) {
double amount = MAX_AMOUNT * Math.random();
bank.transfer(2, 3, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}
catch (InterruptedException e) {
}
};
new Thread(task1).start();
new Thread(task2).start();
}
}
class Bank {
private final double[] accounts;
public Bank(int n, double initialBalance) {
accounts = new double[n];
Arrays.fill(accounts, initialBalance);
}
public void transfer(int from, int to, double amount) {
if (accounts[from] < amount) return;
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}
public double getTotalBalance() {
double sum = 0;
for (double a : accounts) sum += a;
return sum;
}
public int size() {
return accounts.length;
}
}2. 线程状态
线程的 6 种状态:
- New 新建
- Runnable 可运行
- Blocked 阻塞
- Waiting 等待
- Timed waiting 计时等待
- Terminated 终止
getState():获取某个线程的当前状态。
1) 新建线程
用操作符 new 创建一个新线程时,它尚未运行,其状态为 New 新建,线程运行之前还有一些基础工作要做。
2) 可运行线程
调用 start() 后,线程的状态为 Runnable 可运行,它可能在运行、也可能没在运行。
一个线程开始运行后,它不一定始终保持运行,实际上,运行中的线程有时需要暂停,使其他线程有机会运行。
线程调度的细节依赖于操作系统提供的服务:
- 抢占式调度:给每一个可运行线程一个时间片来执行任务,时间片用完时该线程的运行权被剥夺,并给另一个线程机会以运行,对下一个线程的选择取决于优先级。所有现代桌面和服务器操作系统都采用此调度。
- 协作式调度:线程只有在被调用
yield()、或被阻塞、或等待时才失去运行权。手机等小型设备采用此调度。
3) 阻塞和等待线程
线程处于 Blocked 阻塞 或 Waiting 等待 状态时,它不执行代码,且消耗最少资源。由线程调度器激活时的具体细节取决于它曾如何到达非活动状态。
- 当线程试图获取一个内部的对象锁,而该锁目前被其他线程占有,它将进入 Blocked 阻塞 状态;当所有其他线程都释放了该锁,且线程调度器允许该线程持有该锁时,它将进入非阻塞状态。
- 当线程等待另一个线程通知调度器出现某个条件时,它将进入 Waiting 等待 状态。实际上,阻塞与等待区别不大。
- 有些方法具有超时参数,调用它们将使线程进入 Timed Waiting 计时等待 状态,保持至超时期满或者接收到适当的通知;具有超时参数的方法有
Thread::sleep和计时版的Object::wait、Thread::join、Lock::tryLock、Condition::await。
4) 终止线程
run()正常退出时,线程自然终止。run()由于未捕获的异常退出时,线程意外终止。
3. 线程属性
1) 中断线程
interrupt():请求线程终止。
中断状态(interrupted status):每个线程都拥有的 boolean 标志,标记线程是否被中断,调用 interrupt() 将为其设置值。
Thread.currentThread():获取当前线程。
isInterrupted():检查当前线程的中断状态。
while (!Thread.currentThread().isInterrupted() && /* ? */) {
// 操作
}处于阻塞状态的线程不能被检查中断状态,对一个因被调用 sleep() 或 wait() 而阻塞的线程调用 interrupt() 时,该阻塞方法(sleep() / wait())将被一个 InterruptedException 中断。
中断不意味着终止,中断一个线程只是要引起其注意,被中断的线程可以决定如何响应。重要线程应处理此异常并继续运行;更普遍的情况下,线程只将中断看作一个终止请求,此类线程的 run() 有如下形式:
Runnable r = () -> {
try {
// ...
while (!Thread.currentThread().isInterrupted() && /* ? */) {
// 操作
}
}
catch (InterruptedException e) {
// 处理异常
}
finally {
// 清理
}
};如果每次工作周期后都调用 sleep()(或类似方法),那么 isInterrupted() 检查既没有必要也没有用处;如果设置了中断状态,再调用 sleep(),线程将不会休眠,而是清除中断状态并抛出 InterruptedException。因此,如果循环调用 sleep(),不要进行 isInterrupted() 检查,而应捕获 InterruptedException:
Runnable r = () -> {
try {
// ...
while (/* ? */) {
// 操作/
Thread.sleep(delay);
}
}
catch (InterruptedException e) {
// 处理异常
}
finally {
// 清理
}
};两个相似的方法 interrupted() 和 isInterrupted():
interrupted()是一个静态方法,它检查当前线程是否被中断,并清除该线程的中断状态。isInterrupted()是一个实例方法,它检查是否有线程被中断,不会改变线程的中断状态。
不要抑制 InterruptedException,如果想不出如何处理异常,还有如下选择:
- 在
catch子句中调用Thread.currentThread().interrupt()设置中断状态:
void mySubTask() {
// ...
try {
sleep(delay);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// ...
}这样一来,方法调用者可以检测到中断状态。
- 标记
throws InterruptedException,并删除try子句,将异常传递给方法调用者。
2) 守护线程
守护线程(daemon thread):为其他线程提供服务的线程。
setDaemon():将一个线程转换为守护线程,调用 t.setDaemon(true)。
守护线程的例子:
- 计时器线程,它定时向其他线程发送”计时器嘀嗒“信号。
- 清空过时缓存项的线程。
只剩下守护线程时,JVM 将退出。
3) 线程名
setName():为线程命名。
Thread t = new Thread(runnable);
t.setName("Web crawler");4) 未捕获异常的处理器
P578-579
5) 线程优先级
线程优先级曾经很有用,不过现在最好不要再使用。
默认情况下,线程将继承构造它的线程的优先级。
setPriority():设置线程的优先级,MIN_PRIORITY 为 1,MAX_PRIORITY 为 10,NORM_PRIORITY 为 5。
4. 同步
竞态条件(race condition):两个线程同时调用修改同一个对象状态的方法时,互相覆盖导致对象被破坏。
1) 竞态条件的一个例子
为了避免多线程破坏共享数据,需要 同步存取(synchronize the access)。
// 上例中的 Bank::transfer
public void transfer(int from, int to, double amount) {
if (accounts[from] < amount) return;
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}与此前不同,现要随机选择转账的源账户与目标账户。
/*更改后的run*/
Runnable task = () -> {
try {
while (true) {
int fromAccount = (int) (bank.size() * Math.random());
int toAccount = (int) (bank.size() * Math.random());
double amount = MAX_AMOUNT * Math.random();
bank.transfer(fromAccount, toAccount, amount);
Thread.sleep((int) (DELAY * Math.random()));
}
}
catch (InterruptedException e) {
}
};Thread[Thread-0,5,main] 205.19 from 11 to 17 Total Balance: 100000.00
Thread[Thread-0,5,main] 58.81 from 83 to 10 Total Balance: 100000.00
Thread[Thread-1,5,main] 17.64 from 76 to 44 Total Balance: 100000.00
Thread[Thread-1,5,main] 103.91 from 10 to 26 Total Balance: 100000.00
Thread[Thread-0,5,main]Thread[Thread-1,5,main] 898.76 from 72 to 73 Total Balance: 99022.52
977.48 from 99 to 93 Total Balance: 100000.00
Thread[Thread-1,5,main] 807.07 from 60 to 88 Total Balance: 100000.00
Thread[Thread-0,5,main] 718.80 from 73 to 59 Total Balance: 100000.00
Thread[Thread-1,5,main] 210.20 from 27 to 70 Total Balance: 100000.00
Thread[Thread-1,5,main] 346.46 from 62 to 44 Total Balance: 100000.00
Thread[Thread-1,5,main] 915.42 from 26 to 5 Total Balance: 100000.00
Thread[Thread-1,5,main]Thread[Thread-0,5,main] 262.67 from 12 to 58 Total Balance: 99860.93
139.07 from 32 to 62 Total Balance: 100000.00
Thread[Thread-0,5,main] 816.29 from 74 to 21 Total Balance: 100000.00可以看到,这里出现了错误,对于最初的几次交易,所有账户的总金额保持在 100000,不过运行几次之后,总金额出现了错误。
2) 竞态条件详解
在上例中,错误发生于两个线程同时试图更新同一个账户时,例如,它们可能同时执行指令 accounts[toAccount] += amount,问题在于这个调用不是原子操作,而是分步操作:
- 将
accounts[toAccount]加载到寄存器。 - 增加
amount。 - 将结果写回
accounts[toAccount]。
假设第 1 个线程执行步骤 1、2 后其运行权被抢占,第 2 个线程被唤醒,执行完全部操作后,第 1 个线程被唤醒并完成刚才余下的步骤 3,此时第 2 个线程的修改被第 1 个进程所覆盖,产生错误。
如果能确保线程失去控制权之前方法已经运行完成,则可以避免这种错误。
3) 锁对象
有两种机制可以防止并发访问同一个代码块:
- 关键字
synchronized。 ReentrantLock类。
用 ReentrantLock 类保护代码块的基本结构如下:
myLock.lock(); // 一个 ReentrantLock 类对象
try {
// 关键代码段
}
finally {
myLock.unlock(); // 确保锁被解锁
}一旦一个线程调用 lock(),其他线程调用 lock() 时将被暂停,直到第一个线程被解锁,此结构确保同一时间最多只有一个线程访问关键代码段。
必须将 unlock() 的调用包含在 finally 中,无论是否有异常抛出都必须解锁,否则其他线程将永远阻塞。
class Bank {
private Lock bankLock = new ReentrantLock();
// ...
public void transfer(int from, int to, double amount) {
bankLock.lock();
try {
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}
finally {
bankLock.unlock();
}
}
// ...
}bankLock 称为 重入锁(reentrant lock),因为线程可以反复获得已拥有的锁。
锁有一个 持有计数(hold count) 来记录对 lock() 的嵌套调用次数,线程每次调用 lock() 后都要相应调用一次 unlock() 解锁。
由一个锁保护的代码段可以调用另一个同样使用该锁的方法,例如,transfer() 调用 getTotalBalance() 时,也将锁定 bankLock,此时它的持有计数为 2;getTotalBalance() 退出时,持有计数变为 1;transfer() 退出时,持有计数变为 0,bankLock 被解锁。
4) 条件对象
有时,线程进入临界区后发现只有满足了某个条件后它才能运行,可以使用 条件对象(condition object) 来管理那些获得了锁却不能工作的线程。
现在改进转账程序,限制一个账户转出资金的前提是它有足够的资金。
这段代码是不可取的:
if (bank.getBalance(from) >= amount) bank.transfer(from, to, amount);在通过测试之后、调用 transfer() 之前,当前线程有可能被中断,之后它继续运行时,账户可能已经余额不足。
必须确保在检查余额与转账操作之间没有其他线程修改余额,可以使用锁来保护此操作:
public void transfer(int from, int to, double amount) {
bankLock.lock();
try {
while (accounts[from] < amount) {
// 等待其他线程转入资金
}
// 转账操作
}
finally {
bankLock.unlock();
}
}当账户中没有足够金额时,程序将等待其他线程为其增加足够的金额,但是此线程拥有对 bankLock 的独占访问权,其他线程没有机会进行转账,这里就要引入条件对象。
条件对象的类型为 Condition,由对一个锁调用 newCondition() 新建,一个锁可以关联任意数量的条件对象,条件对象应该有一个合适的名称以反映它表示的条件。
class Bank {
private Condition sufficientFunds; // 表示资金充足的状态
// ...
public Bank() {
// ...
sufficientFunds = bankLock.newCondition();
}
// ...
}如果检查发现金额不足,它将调用:
sufficientFunds.await();当前线程被暂停并解锁,以允许其他线程执行。
等待获得锁的线程和被调用 await() 的线程存在本质上的不同,如果线程被调用 await(),它将进入该条件的 等待集(wait set),即使锁可用时,它也不会进入可运行状态,直到另一个线程对该条件对象调用 signalAll()。
signalAll():通知在等待某条件满足的线程,现在条件有可能满足(而不是已经满足),线程应重新测试条件是否满足;它只是解除所有等待中线程的阻塞状态、使它们可以在当前线程解锁后竞争访问对象,而不是激活线程。
此例中,当另一个线程完成转账时,应该对 sufficientFunds 调用 signalAll():
sufficientFunds.signalAll();此调用将激活正在等待条件的所有线程,从等待集中移出的线程将进入可运行状态,由调度器将它们再次激活。一旦锁可用,它们中的一个将从 await() 调用返回,锁定该锁,并从之前暂停处继续执行。
signal():随机选择等待集中的一个线程,解除其阻塞状态,如果该线程发现条件仍未满足,将再次阻塞,这比解除所有线程的阻塞状态更高效,但也存在风险。
一个线程必须锁定了一个带条件的锁,它才能对那些条件调用 await() / signalAll() / signal()。
await() 调用通常放在 while 内:
while (!(/* 条件满足 */)) condition.await();最终需要由其他某个线程调用 signalAll(),因为当对一个线程调用 await() 时,它无法自行重新激活,如果没有被其他线程激活,它将永远无法运行,这就是 死锁(deadlock)。如果所有线程都被阻塞,没有任何线程可以解锁其他线程的阻塞状态,程序将永远挂起。
如果一个对象的状态发生变化可能导致条件被满足,就应调用 signalAll()。
// 每次完成转账后都调用 signalAll
public void transfer(int from, int to, double amount) {
bankLock.lock();
try {
while (accounts[from] < amount) sufficientFunds.await();
// 转账
sufficientFunds.signalAll();
}
finally {
bankLock.unlock();
}
}class Bank {
private final double[] accounts;
private Lock bankLock;
private Condition sufficientFunds;
public Bank(int n, double initialBalance) {
accounts = new double[n];
Arrays.fill(accounts, initialBalance);
bankLock = new ReentrantLock();
sufficientFunds = bankLock.newCondition();
}
public void transfer(int from, int to, double amount) throws InterruptedException {
bankLock.lock();
try {
while (accounts[from] < amount) sufficientFunds.await();
System.out.print(Thread.currentThread());
accounts[from] -= amount;
System.out.printf(" %10.2f from %d to %d", amount, from, to);
accounts[to] += amount;
System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
sufficientFunds.signalAll();
}
finally {
bankLock.unlock();
}
}
public double getTotalBalance() {
bankLock.lock();
try {
double sum = 0;
for (double a : accounts) sum += a;
return sum;
}
finally {
bankLock.unlock();
}
}
public int size() {
return accounts.length;
}
}锁与条件小结:
- 锁用来保护代码段,一次只允许一个线程执行被保护的代码。
- 锁可以管理试图进入被保护代码段的线程。
- 一个锁可以有一个或多个关联的条件对象。
- 每个条件对象管理那些已经进入被保护代码段但还不能运行的线程。
5) synchronized 关键字
每个对象都有一个 内部锁(intrinsic lock),如果将一个方法声明为 synchronized,对象的内部锁将保护该方法,线程在调用它之前必须获得内部对象锁。
public synchronized void method() { }
// 等价于
public void method() {
this.intrinsicLock.lock();
try {
// ...
}
finally {
this.intrinsicLock.unlock();
}
}对于上例,可以将 transfer() 声明为 synchronized。
内部对象锁只能有一个关联条件。wait() 对应于 await(),notifyAll() / notify() 对应于 signalAll() / signal()。
class Bank {
// ...
public synchronized void transfer(int from, int to, double amount) throws InterruptedException {
while (accounts[from] < amount) wait();
// ...
notifyAll();
}
public synchronized double getTotalBalance() { }
}如果将静态方法声明为 synchronized,调用时,它将获得关联类对象的内部锁。例如,如果 Bank 类有一个静态 synchronized 方法,调用它时将锁定 Bank 对象的内部锁,此时其他线程无法调用 Bank 类的任何静态 synchronized 方法。
内部锁与条件存在一些限制:
- 不能中断一个正在尝试获得锁的线程。
- 不能指定尝试获得锁的超时时间。
- 每个锁只有一个条件(低效)。
尽可能优先使用 java.util.concurrent 包中的某种机制,其次考虑 synchronized,最后考虑 Lock / Condition。
6) 同步块
线程可以调用 synchronized() 获得内部对象锁,除此以外,进入一个 同步块(synchronized block) 时也将获得内部对象锁。
// 同步块的格式
synchronized (obj) {
// 关键代码段
}“专用”(ad hoc)锁:
class Bank {
private Lock lock = new Object();
// ...
public void transfer(int from, int to, double amount) {
// ...
synchronized (lock) {
accounts[from] -= amount;
accounts[to] += amount;
}
// ...
}
}此处创建 lock 只是为了使用每个 Java 对象拥有的锁。
使用同步块时,要注意锁对象。
// 有风险的代码
private final String lock = "LOCK";
// ...
synchronized (lock) { }如果这段代码在同一个程序中出现两次,锁将是同一个对象(字符串字面量共享),这可能导致死锁。
另外要避免使用基本类型包装器作为锁:
private final Integer lock = new Integer(42); // 不要用包装器作为锁如需修改一个静态字段,要从特定的类上获得锁,而不是从 getClass() 返回值上:
synchronized (MyClass.class) { staticCounter++; } // OK
synchronized (getClass()) { staticCounter++; } // Don't7) 监视器概念
监视器(monitor):一个不要求显式锁就可以保证多线程安全性的解决方案,有如下属性:
- 监视器是只包含私有字段的类。
- 监视器类的每个对象有一个关联的锁,所有方法由该锁锁定。
- 锁可以有任意多个关联的条件。
8) volatile 字段
关键字 volatile:为实例字段的同步访问提供了一种免锁机制,将字段声明为 volatile 后,编译器与 JVM 将考虑到该字段可能被另一个线程并发更新。
假设某个对象有一个 boolean 标记 done:
private boolean done;
public synchronized boolean isDone() { return done; }
public synchronized void setDone() { done = true; }如果另一个线程已经对该对象加锁,这两个方法可能阻塞,在此情况下,将字段声明为 volatile 更合适:
private volatile boolean done;
public boolean isDone() { return done; }
public void setDone() { done = true; }9) final 变量
将一个共享字段声明为 final 时可以安全地对其访问:
final Map<String, Double> accounts = new HashMap<>();其他线程将在构造器完成构造后才能看到 accounts,如果不声明为 final,其他线程可能看到 null,而不是新构造的 HashMap。