进程(process) 与 线程(thread) 的区别:每个进程都拥有自己的一整套变量,线程则共享数据;线程更“轻量级”,创建、撤销一个线程比启动一个进程的开销小很多。

1. 什么是线程

实例:设计一个转账程序,同时在银行账户 0123 之间转账。

在线程中执行任务的过程:

  1. 将执行任务的代码放在一个类的 run() 中,该类需要实现 Runnable 接口。
public interface Runnable {
    void run();
}

Runnable 是一个函数式接口,可以用一个 lambda 创建实例:

Runnable r = () -> {
    // 执行任务
};
  1. 从该实例构造一个 Thread 对象:
Thread t = new Thread(r);
  1. 启动该 Thread 对象:
t.start();
Runnable r = () -> {
    try {
        for (int i = 0; i < STEPS; i++) {
            double amount = MAX_AMOUNT * Math.random();
            bank.transfer(0, 1, amount);
            Thread.sleep((int) (DELAY * Math.random()));
        }
    }
    catch (InterruptedException e) {
    }
};
Thread t = new Thread(r);
t.start();

要捕获 sleep() 可能抛出的 InterruptedException,中断用于请求终止一个线程,出现 InterruptedException()run() 将退出。

该程序还将启动第二个线程,在账户 23 之间转账,运行时发现两对账户之间的转账信息交错输出,说明它们在并发运行。

class ThreadTest {
    public static final int DELAY = 10;
    public static final int STEPS = 100;
    public static final double MAX_AMOUNT = 1000;
    
    public static void main(String[] args) {
        Bank bank = new Bank(4, 100000);
        
        Runnable task1 = () -> {
            try {
                for (int i = 0; i < STEPS; i++) {
                    double amount = MAX_AMOUNT * Math.random();
                    bank.transfer(0, 1, amount);
                    Thread.sleep((int) (DELAY * Math.random()));
                }
            }
            catch (InterruptedException e) {
            }
        };
        
        Runnable task2 = () -> {
            try {
                for (int i = 0; i < STEPS; i++) {
                    double amount = MAX_AMOUNT * Math.random();
                    bank.transfer(2, 3, amount);
                    Thread.sleep((int) (DELAY * Math.random()));
                }
            }
            catch (InterruptedException e) {
            }
        };
        
        new Thread(task1).start();
        new Thread(task2).start();
    }
}
 
class Bank {
    private final double[] accounts;
    
    public Bank(int n, double initialBalance) {
        accounts = new double[n];
        Arrays.fill(accounts, initialBalance);
    }
    
    public void transfer(int from, int to, double amount) {
        if (accounts[from] < amount) return;
        System.out.print(Thread.currentThread());
        accounts[from] -= amount;
        System.out.printf(" %10.2f from %d to %d", amount, from, to);
        accounts[to] += amount;
        System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
    }
    
    public double getTotalBalance() {
        double sum = 0;
        for (double a : accounts) sum += a;
        return sum;
    }
    
    public int size() {
        return accounts.length;
    }
}

2. 线程状态

线程的 6 种状态:

  • New 新建
  • Runnable 可运行
  • Blocked 阻塞
  • Waiting 等待
  • Timed waiting 计时等待
  • Terminated 终止

getState():获取某个线程的当前状态。

1) 新建线程

用操作符 new 创建一个新线程时,它尚未运行,其状态为 New 新建,线程运行之前还有一些基础工作要做。

2) 可运行线程

调用 start() 后,线程的状态为 Runnable 可运行,它可能在运行、也可能没在运行。

一个线程开始运行后,它不一定始终保持运行,实际上,运行中的线程有时需要暂停,使其他线程有机会运行。

线程调度的细节依赖于操作系统提供的服务:

  • 抢占式调度:给每一个可运行线程一个时间片来执行任务,时间片用完时该线程的运行权被剥夺,并给另一个线程机会以运行,对下一个线程的选择取决于优先级。所有现代桌面和服务器操作系统都采用此调度。
  • 协作式调度:线程只有在被调用 yield()、或被阻塞、或等待时才失去运行权。手机等小型设备采用此调度。

3) 阻塞和等待线程

线程处于 Blocked 阻塞Waiting 等待 状态时,它不执行代码,且消耗最少资源。由线程调度器激活时的具体细节取决于它曾如何到达非活动状态。

  • 当线程试图获取一个内部的对象锁,而该锁目前被其他线程占有,它将进入 Blocked 阻塞 状态;当所有其他线程都释放了该锁,且线程调度器允许该线程持有该锁时,它将进入非阻塞状态。
  • 当线程等待另一个线程通知调度器出现某个条件时,它将进入 Waiting 等待 状态。实际上,阻塞与等待区别不大。
  • 有些方法具有超时参数,调用它们将使线程进入 Timed Waiting 计时等待 状态,保持至超时期满或者接收到适当的通知;具有超时参数的方法有 Thread::sleep 和计时版的 Object::waitThread::joinLock::tryLockCondition::await

4) 终止线程

  • run() 正常退出时,线程自然终止。
  • run() 由于未捕获的异常退出时,线程意外终止。

3. 线程属性

1) 中断线程

interrupt():请求线程终止。

中断状态(interrupted status):每个线程都拥有的 boolean 标志,标记线程是否被中断,调用 interrupt() 将为其设置值。

Thread.currentThread():获取当前线程。

isInterrupted():检查当前线程的中断状态。

while (!Thread.currentThread().isInterrupted() && /* ? */) {
    // 操作
}

处于阻塞状态的线程不能被检查中断状态,对一个因被调用 sleep()wait() 而阻塞的线程调用 interrupt() 时,该阻塞方法(sleep() / wait())将被一个 InterruptedException 中断。

中断不意味着终止,中断一个线程只是要引起其注意,被中断的线程可以决定如何响应。重要线程应处理此异常并继续运行;更普遍的情况下,线程只将中断看作一个终止请求,此类线程的 run() 有如下形式:

Runnable r = () -> {
    try {
        // ...
        while (!Thread.currentThread().isInterrupted() && /* ? */) {
            // 操作
        }
    }
    catch (InterruptedException e) {
        // 处理异常
    }
    finally {
        // 清理
    }
};

如果每次工作周期后都调用 sleep()(或类似方法),那么 isInterrupted() 检查既没有必要也没有用处;如果设置了中断状态,再调用 sleep(),线程将不会休眠,而是清除中断状态并抛出 InterruptedException。因此,如果循环调用 sleep(),不要进行 isInterrupted() 检查,而应捕获 InterruptedException

Runnable r = () -> {
    try {
        // ...
        while (/* ? */) {
            // 操作/
            Thread.sleep(delay);
        }
    }
    catch (InterruptedException e) {
        // 处理异常
    }
    finally {
        // 清理
    }
};

两个相似的方法 interrupted()isInterrupted()

  • interrupted() 是一个静态方法,它检查当前线程是否被中断,并清除该线程的中断状态。
  • isInterrupted() 是一个实例方法,它检查是否有线程被中断,不会改变线程的中断状态。

不要抑制 InterruptedException,如果想不出如何处理异常,还有如下选择:

  • catch 子句中调用 Thread.currentThread().interrupt() 设置中断状态:
void mySubTask() {
    // ...
    try {
        sleep(delay);
    }
    catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
    // ...
}

这样一来,方法调用者可以检测到中断状态。

  • 标记 throws InterruptedException,并删除 try 子句,将异常传递给方法调用者。

2) 守护线程

守护线程(daemon thread):为其他线程提供服务的线程。

setDaemon():将一个线程转换为守护线程,调用 t.setDaemon(true)

守护线程的例子:

  • 计时器线程,它定时向其他线程发送”计时器嘀嗒“信号。
  • 清空过时缓存项的线程。

只剩下守护线程时,JVM 将退出。

3) 线程名

setName():为线程命名。

Thread t = new Thread(runnable);
t.setName("Web crawler");

4) 未捕获异常的处理器

P578-579

5) 线程优先级

线程优先级曾经很有用,不过现在最好不要再使用。

默认情况下,线程将继承构造它的线程的优先级。

setPriority():设置线程的优先级,MIN_PRIORITY1MAX_PRIORITY10NORM_PRIORITY5

4. 同步

竞态条件(race condition):两个线程同时调用修改同一个对象状态的方法时,互相覆盖导致对象被破坏。

1) 竞态条件的一个例子

为了避免多线程破坏共享数据,需要 同步存取(synchronize the access)。

// 上例中的 Bank::transfer
public void transfer(int from, int to, double amount) {
    if (accounts[from] < amount) return;
    System.out.print(Thread.currentThread());
    accounts[from] -= amount;
    System.out.printf(" %10.2f from %d to %d", amount, from, to);
    accounts[to] += amount;
    System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
}

与此前不同,现要随机选择转账的源账户与目标账户。

/*更改后的run*/
Runnable task = () -> {
    try {
        while (true) {
            int fromAccount = (int) (bank.size() * Math.random());
            int toAccount = (int) (bank.size() * Math.random());
            double amount = MAX_AMOUNT * Math.random();
            bank.transfer(fromAccount, toAccount, amount);
            Thread.sleep((int) (DELAY * Math.random()));
        }
    }
    catch (InterruptedException e) {
    }
};
Thread[Thread-0,5,main]     205.19 from 11 to 17 Total Balance:  100000.00
Thread[Thread-0,5,main]      58.81 from 83 to 10 Total Balance:  100000.00
Thread[Thread-1,5,main]      17.64 from 76 to 44 Total Balance:  100000.00
Thread[Thread-1,5,main]     103.91 from 10 to 26 Total Balance:  100000.00
Thread[Thread-0,5,main]Thread[Thread-1,5,main]     898.76 from 72 to 73 Total Balance:   99022.52
     977.48 from 99 to 93 Total Balance:  100000.00
Thread[Thread-1,5,main]     807.07 from 60 to 88 Total Balance:  100000.00
Thread[Thread-0,5,main]     718.80 from 73 to 59 Total Balance:  100000.00
Thread[Thread-1,5,main]     210.20 from 27 to 70 Total Balance:  100000.00
Thread[Thread-1,5,main]     346.46 from 62 to 44 Total Balance:  100000.00
Thread[Thread-1,5,main]     915.42 from 26 to 5 Total Balance:  100000.00
Thread[Thread-1,5,main]Thread[Thread-0,5,main]     262.67 from 12 to 58 Total Balance:   99860.93
     139.07 from 32 to 62 Total Balance:  100000.00
Thread[Thread-0,5,main]     816.29 from 74 to 21 Total Balance:  100000.00

可以看到,这里出现了错误,对于最初的几次交易,所有账户的总金额保持在 100000,不过运行几次之后,总金额出现了错误。

2) 竞态条件详解

在上例中,错误发生于两个线程同时试图更新同一个账户时,例如,它们可能同时执行指令 accounts[toAccount] += amount,问题在于这个调用不是原子操作,而是分步操作:

  1. accounts[toAccount] 加载到寄存器。
  2. 增加 amount
  3. 将结果写回 accounts[toAccount]

假设第 1 个线程执行步骤 1、2 后其运行权被抢占,第 2 个线程被唤醒,执行完全部操作后,第 1 个线程被唤醒并完成刚才余下的步骤 3,此时第 2 个线程的修改被第 1 个进程所覆盖,产生错误。

如果能确保线程失去控制权之前方法已经运行完成,则可以避免这种错误。

3) 锁对象

有两种机制可以防止并发访问同一个代码块:

  • 关键字 synchronized
  • ReentrantLock 类。

ReentrantLock 类保护代码块的基本结构如下:

myLock.lock(); // 一个 ReentrantLock 类对象
try {
    // 关键代码段
}
finally {
    myLock.unlock(); // 确保锁被解锁
}

一旦一个线程调用 lock(),其他线程调用 lock() 时将被暂停,直到第一个线程被解锁,此结构确保同一时间最多只有一个线程访问关键代码段。

必须将 unlock() 的调用包含在 finally 中,无论是否有异常抛出都必须解锁,否则其他线程将永远阻塞。

class Bank {
    private Lock bankLock = new ReentrantLock();
    // ...
    public void transfer(int from, int to, double amount) {
        bankLock.lock();
        try {
            System.out.print(Thread.currentThread());
            accounts[from] -= amount;
            System.out.printf(" %10.2f from %d to %d", amount, from, to);
            accounts[to] += amount;
            System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
        }
        finally {
            bankLock.unlock();
        }
    }
    // ...
}

bankLock 称为 重入锁(reentrant lock),因为线程可以反复获得已拥有的锁。

锁有一个 持有计数(hold count) 来记录对 lock() 的嵌套调用次数,线程每次调用 lock() 后都要相应调用一次 unlock() 解锁。

由一个锁保护的代码段可以调用另一个同样使用该锁的方法,例如,transfer() 调用 getTotalBalance() 时,也将锁定 bankLock,此时它的持有计数为 2getTotalBalance() 退出时,持有计数变为 1transfer() 退出时,持有计数变为 0bankLock 被解锁。

4) 条件对象

有时,线程进入临界区后发现只有满足了某个条件后它才能运行,可以使用 条件对象(condition object) 来管理那些获得了锁却不能工作的线程。

现在改进转账程序,限制一个账户转出资金的前提是它有足够的资金。

这段代码是不可取的:

if (bank.getBalance(from) >= amount) bank.transfer(from, to, amount);

在通过测试之后、调用 transfer() 之前,当前线程有可能被中断,之后它继续运行时,账户可能已经余额不足。

必须确保在检查余额与转账操作之间没有其他线程修改余额,可以使用锁来保护此操作:

public void transfer(int from, int to, double amount) {
    bankLock.lock();
    try {
        while (accounts[from] < amount) {
            // 等待其他线程转入资金
        }
        // 转账操作
    }
    finally {
        bankLock.unlock();
    }
}

当账户中没有足够金额时,程序将等待其他线程为其增加足够的金额,但是此线程拥有对 bankLock 的独占访问权,其他线程没有机会进行转账,这里就要引入条件对象。

条件对象的类型为 Condition,由对一个锁调用 newCondition() 新建,一个锁可以关联任意数量的条件对象,条件对象应该有一个合适的名称以反映它表示的条件。

class Bank {
    private Condition sufficientFunds; // 表示资金充足的状态
    // ...
    public Bank() {
        // ...
        sufficientFunds = bankLock.newCondition();
    }
    // ...
}

如果检查发现金额不足,它将调用:

sufficientFunds.await();

当前线程被暂停并解锁,以允许其他线程执行。

等待获得锁的线程和被调用 await() 的线程存在本质上的不同,如果线程被调用 await(),它将进入该条件的 等待集(wait set),即使锁可用时,它也不会进入可运行状态,直到另一个线程对该条件对象调用 signalAll()

signalAll():通知在等待某条件满足的线程,现在条件有可能满足(而不是已经满足),线程应重新测试条件是否满足;它只是解除所有等待中线程的阻塞状态、使它们可以在当前线程解锁后竞争访问对象,而不是激活线程。

此例中,当另一个线程完成转账时,应该对 sufficientFunds 调用 signalAll()

sufficientFunds.signalAll();

此调用将激活正在等待条件的所有线程,从等待集中移出的线程将进入可运行状态,由调度器将它们再次激活。一旦锁可用,它们中的一个将从 await() 调用返回,锁定该锁,并从之前暂停处继续执行。

signal():随机选择等待集中的一个线程,解除其阻塞状态,如果该线程发现条件仍未满足,将再次阻塞,这比解除所有线程的阻塞状态更高效,但也存在风险。

一个线程必须锁定了一个带条件的锁,它才能对那些条件调用 await() / signalAll() / signal()

await() 调用通常放在 while 内:

while (!(/* 条件满足 */)) condition.await();

最终需要由其他某个线程调用 signalAll(),因为当对一个线程调用 await() 时,它无法自行重新激活,如果没有被其他线程激活,它将永远无法运行,这就是 死锁(deadlock)。如果所有线程都被阻塞,没有任何线程可以解锁其他线程的阻塞状态,程序将永远挂起。

如果一个对象的状态发生变化可能导致条件被满足,就应调用 signalAll()

// 每次完成转账后都调用 signalAll
public void transfer(int from, int to, double amount) {
    bankLock.lock();
    try {
        while (accounts[from] < amount) sufficientFunds.await();
        // 转账
        sufficientFunds.signalAll();
    }
    finally {
        bankLock.unlock();
    }
}
class Bank {
    private final double[] accounts;
    private Lock bankLock;
    private Condition sufficientFunds;
    
    public Bank(int n, double initialBalance) {
        accounts = new double[n];
        Arrays.fill(accounts, initialBalance);
        bankLock = new ReentrantLock();
        sufficientFunds = bankLock.newCondition();
    }
    
    public void transfer(int from, int to, double amount) throws InterruptedException {
        bankLock.lock();
        try {
            while (accounts[from] < amount) sufficientFunds.await();
            System.out.print(Thread.currentThread());
            accounts[from] -= amount;
            System.out.printf(" %10.2f from %d to %d", amount, from, to);
            accounts[to] += amount;
            System.out.printf(" Total Balance: %10.2f%n", getTotalBalance());
            sufficientFunds.signalAll();
        }
        finally {
            bankLock.unlock();
        }
    }
    
    public double getTotalBalance() {
        bankLock.lock();
        try {
            double sum = 0;
            for (double a : accounts) sum += a;
            return sum;
        }
        finally {
            bankLock.unlock();
        }
    }
    
    public int size() {
        return accounts.length;
    }
}

锁与条件小结:

  • 锁用来保护代码段,一次只允许一个线程执行被保护的代码。
  • 锁可以管理试图进入被保护代码段的线程。
  • 一个锁可以有一个或多个关联的条件对象。
  • 每个条件对象管理那些已经进入被保护代码段但还不能运行的线程。

5) synchronized 关键字

每个对象都有一个 内部锁(intrinsic lock),如果将一个方法声明为 synchronized,对象的内部锁将保护该方法,线程在调用它之前必须获得内部对象锁。

public synchronized void method() { }
 
// 等价于
 
public void method() {
    this.intrinsicLock.lock();
    try {
        // ...
    }
    finally {
        this.intrinsicLock.unlock();
    }
}

对于上例,可以将 transfer() 声明为 synchronized

内部对象锁只能有一个关联条件。wait() 对应于 await()notifyAll() / notify() 对应于 signalAll() / signal()

class Bank {
    // ...
    
    public synchronized void transfer(int from, int to, double amount) throws InterruptedException {
        while (accounts[from] < amount) wait();
        // ...
        notifyAll();
    }
    
    public synchronized double getTotalBalance() { }
}

如果将静态方法声明为 synchronized,调用时,它将获得关联类对象的内部锁。例如,如果 Bank 类有一个静态 synchronized 方法,调用它时将锁定 Bank 对象的内部锁,此时其他线程无法调用 Bank 类的任何静态 synchronized 方法。

内部锁与条件存在一些限制:

  • 不能中断一个正在尝试获得锁的线程。
  • 不能指定尝试获得锁的超时时间。
  • 每个锁只有一个条件(低效)。

尽可能优先使用 java.util.concurrent 包中的某种机制,其次考虑 synchronized,最后考虑 Lock / Condition

6) 同步块

线程可以调用 synchronized() 获得内部对象锁,除此以外,进入一个 同步块(synchronized block) 时也将获得内部对象锁。

// 同步块的格式
synchronized (obj) {
    // 关键代码段
}

“专用”(ad hoc)锁:

class Bank {
    private Lock lock = new Object();
    // ...
    public void transfer(int from, int to, double amount) {
        // ...
        synchronized (lock) {
            accounts[from] -= amount;
            accounts[to] += amount;
        }
        // ...
    }
}

此处创建 lock 只是为了使用每个 Java 对象拥有的锁。

使用同步块时,要注意锁对象。

// 有风险的代码
private final String lock = "LOCK";
// ...
synchronized (lock) { }

如果这段代码在同一个程序中出现两次,锁将是同一个对象(字符串字面量共享),这可能导致死锁。

另外要避免使用基本类型包装器作为锁:

private final Integer lock = new Integer(42); // 不要用包装器作为锁

如需修改一个静态字段,要从特定的类上获得锁,而不是从 getClass() 返回值上:

synchronized (MyClass.class) { staticCounter++; } // OK
synchronized (getClass()) { staticCounter++; } // Don't

7) 监视器概念

监视器(monitor):一个不要求显式锁就可以保证多线程安全性的解决方案,有如下属性:

  • 监视器是只包含私有字段的类。
  • 监视器类的每个对象有一个关联的锁,所有方法由该锁锁定。
  • 锁可以有任意多个关联的条件。

8) volatile 字段

关键字 volatile:为实例字段的同步访问提供了一种免锁机制,将字段声明为 volatile 后,编译器与 JVM 将考虑到该字段可能被另一个线程并发更新。

假设某个对象有一个 boolean 标记 done

private boolean done;
public synchronized boolean isDone() { return done; }
public synchronized void setDone() { done = true; }

如果另一个线程已经对该对象加锁,这两个方法可能阻塞,在此情况下,将字段声明为 volatile 更合适:

private volatile boolean done;
public boolean isDone() { return done; }
public void setDone() { done = true; }

9) final 变量

将一个共享字段声明为 final 时可以安全地对其访问:

final Map<String, Double> accounts = new HashMap<>();

其他线程将在构造器完成构造后才能看到 accounts,如果不声明为 final,其他线程可能看到 null,而不是新构造的 HashMap

10) 原子性

5. 线程安全的集合

6. 任务和线程池

7. 异步计算

8. 进程