1 minute read

概要

主要讨论Golang中的多线程和RPC,以及实验相关的内容。

多线程

  • 线程和进程是包含的关系,一个进程可以产生很多线程
  • 线程可以共享内存
  • 每个线程都是串行执行,就像是非线程程序
  • 每个线程都有自己的线程状态:程序计数器、寄存器、栈

Why needed?

在分布式系统中,需要并发执行。而多线程是实现并发的廉价的方式。进程切换开销太大。

进程和线程的切换问题

  • 每个进程都有属于自己的、私有的、地址连续的虚拟内存,通过页表映射到物理内存上。页表在内存中,每个进程有自己的页表。为了加速页表,还引入了高速缓存TLB。

    • TLB (Translation Look Aside Buffer)

      页表是在内存中的,有时可能比较大,为了加速遍历求page frame number的过程,会使用更加快的cache组成TLB。

      TLB在全局只有一个。

  • 每个进程有自己独立的虚拟地址空间,进程内的所有线程共享进程的虚拟地址空间。

进程切换涉及虚拟地址空间的切换而线程不会。而线程是共享所在进程的虚拟地址空间的,因此同一个进程中的线程进行线程切换时不涉及虚拟地址空间的转换。

为什么虚拟地址空间切换会比较耗时呢?
  • 因为cache和TLB会失效

    当进程切换后页表也要进行切换,页表切换后TLB就失效了,cache失效导致命中率降低,那么虚拟地址转换为物理地址就会变慢,表现出来的就是程序运行会变慢。

是否有线程的替代方案?

在单线程中写非串行逻辑。这种方式也被称为事件驱动。譬如 JavaScript。

做法:使用一个状态表保存所有活动的状态,假设是每个客户端的请求。

事件循环:在收到服务器的响应时,传入新的输入值,检查状态表中的每个活动的状态。执行每个活动的下一步。并且更新状态。依次循环完成整个状态表。

线程的挑战

共享数据

例如:有两个线程,同时做一个 n=n+1 的逻辑?有一个线程正在读取,而另外有一个线程在做自增操作?类似数据库事务中遇到的数据共享问题。

上面两个问题都遇到了 “竞争” 问题,这个会成为一个程序的 Bug。在电商领域会有超卖的风险。如何解决?

  • 使用锁(Go 中使用 sync.Mutex)
  • 避免使用共享可变数据(immutable and mutable)

线程间协调

例如:一个线程生产数据,另一个线程消费数据。生产与消费者

  • 消费者如何等待(不能总是占着 CPU)
  • 生产者如何唤醒消费者?

如何解决?

  • 使用 Go channel 通讯
  • sync.Cond
  • WaitGroup

死锁

周期性的锁检查或者是通讯(RPC 或 Go channels)

案例学习:Web 爬虫

爬虫的挑战

  • 如何利用 I/O 并发

    爬虫的很多瓶颈都是因为网络带宽的限制。在同一时刻抓取多个 URL 来提高并发(每秒抓取数量)

    • 利用多线程来提高并发
  • 避免重复抓取

    多次抓取是网络的浪费,这里需要记录访问状态

  • 要知道何时停止

    不能没有边界,遇到什么情况就需要停止继续爬取

方案

串行爬取
并发互斥爬取 concurrentMutex
  • 为每个爬取的页面创建一个线程

    并发爬取、提高爬取速度。 “go func” 可以创建一个 goroutine 并且运行该函数(可以是匿名函数)

  • 线程之间是共享变量 fetched map

    通过互斥锁的方式来控制,同时只能一个线程访问该变量

ConcurrentMutex 何时认为执行已经完成?

  • sync.WaitGroup

Wait () 等待所有的 Add () 操作,但遇见调用 Done () 时表示已经运行完成。譬如:可以用来处理等待所有的子进程处理完成。

相当于OS学的PV操作中的semaphore信号量。

并发通信爬取 ConcurrentChannel

channel 线程间通信

for y := range ch 当ch为空时会阻塞!

master和workers

  • master 是如何知道已经完成爬取?

    使用计数的方式,每个 worker 只发送一次消息到 channel

//
// Concurrent crawler with channels
//

func worker(url string, ch chan []string, fetcher Fetcher) {
  // 每个worker只处理一个url
	urls, err := fetcher.Fetch(url)
	if err != nil {
		ch <- []string{} // 避免coordinator死锁
	} else {
		ch <- urls 
	}
}

func coordinator(ch chan []string, fetcher Fetcher) {
	n := 1
	fetched := make(map[string]bool)
	for urls := range ch {
		for _, u := range urls {
			if fetched[u] == false {
				fetched[u] = true
				n += 1
				go worker(u, ch, fetcher) // 每次收到新任务才新增一个worker
			}
		}
		n -= 1
		if n == 0 {
			break
		}
	}
}

何时应该使用共享变量和锁?对比通道有什么好处?

其实所有的问题都有两种解决方案,依赖你如何去思考。

  • 状态– 共享和锁
  • 通信– 通道

远程调用(RPC)

RPC 的问题:调用失败怎么办?

例如:丢包、网络中断、服务器慢、服务器崩溃

对于连接的客户端来说,故障会表现出什么样子?

  • 客户端得不到服务端响应
  • 客户端不知道服务端是否已经收到了请求,这里有几种情况。
    • 请求未收到
    • 请求已经收到正在处理,但在返回结果时崩溃了
    • 服务端发送了结果,但是网络出现故障

简单的故障处理方式:best effort

  • Call () 时等待一段时间的响应
  • 重发请求
  • 当等待一段时间后,发现还是没有响应就放弃并返回错误。

优秀的 RPC 服务:只做一次

解决方式:RPC服务器能识别什么是重复的请求,如果是重复的请求,则返回前面生成的结果。

问题:如何识别出重复的请求?

客户端在请求时,针对每个请求,可以生成一个唯一请求 ID(request id)。当进行重发时,该 ID 不变。

Comments