网站建设方式丨金手指排名26,同一个ip的网站做链接有用,贵港建设局网站查询,免费建站软件有哪些操作系统调度算法实战#xff1a;从FCFS到多级反馈队列的保姆级代码实现 你是否曾好奇#xff0c;当你同时打开浏览器、音乐播放器和代码编辑器时#xff0c;电脑是如何决定哪个程序先运行#xff0c;哪个程序后运行#xff0c;又是如何保证系统不会卡死的#xff1f;这背…操作系统调度算法实战从FCFS到多级反馈队列的保姆级代码实现你是否曾好奇当你同时打开浏览器、音乐播放器和代码编辑器时电脑是如何决定哪个程序先运行哪个程序后运行又是如何保证系统不会卡死的这背后隐藏的正是操作系统调度算法的精妙设计。对于每一位编程初学者和操作系统学习者而言理解这些算法不仅是掌握计算机科学核心原理的关键更是提升编程思维、写出高效并发程序的基础。然而仅仅停留在理论层面看着书本上那些抽象的“就绪队列”、“时间片”概念总感觉隔着一层迷雾。真正的理解往往始于动手实践。本文将带你从零开始用代码亲手搭建一个进程调度模拟器将FCFS、SJF、RR、优先级调度乃至复杂的多级反馈队列MLFQ算法从纸上谈兵变为可运行、可观察、可调试的Python程序。我们将聚焦于如何用最直观的数据结构如队列、列表来模拟进程状态的变化如何处理时间片轮转中的边界条件以及如何动态调整优先级。通过一行行代码的构建你将不仅理解算法“是什么”更能深刻体会它们“为什么”这样设计以及在实际系统中“如何”工作。1. 环境准备与核心概念建模在开始编码之前我们需要建立一个清晰的模拟环境。操作系统的调度发生在内核中管理着名为“进程”的实体。在我们的模拟器中一个进程至少需要包含以下信息一个唯一的标识符PID、到达系统的时间、需要CPU服务的时间即“突发时间”或“运行时间”、以及可选的优先级。此外为了计算各种评估指标我们还需要记录它的开始运行时间、完成时间从而推导出周转时间、等待时间等。我们将使用Python的dataclass来优雅地定义进程。dataclass能自动生成构造函数、__repr__等方法让我们的代码更简洁。from dataclasses import dataclass from enum import Enum from typing import Optional class ProcessState(Enum): 进程状态枚举 NEW 新建 # 已创建未进入就绪队列 READY 就绪 # 在就绪队列中等待CPU RUNNING 运行 # 正在CPU上执行 TERMINATED 终止 # 已完成执行 dataclass class Process: 进程数据类 pid: int # 进程ID arrival_time: int # 到达时间 burst_time: int # 所需CPU时间总服务时间 remaining_time: int # 剩余CPU时间用于时间片轮转等 priority: int 0 # 优先级数字越小优先级越高默认0 state: ProcessState ProcessState.NEW start_time: Optional[int] None # 首次开始运行的时间 finish_time: Optional[int] None # 完成时间 property def turnaround_time(self) - int: 周转时间 完成时间 - 到达时间 if self.finish_time is None: return 0 return self.finish_time - self.arrival_time property def waiting_time(self) - int: 等待时间 周转时间 - 突发时间 return self.turnaround_time - self.burst_time有了进程的定义我们还需要一个“就绪队列”来存放所有等待CPU的进程。不同的调度算法对队列的操作入队、出队、排序方式截然不同。为了模拟时间的推进我们引入一个全局的“当前时间”时钟。调度器的工作就是在每个时间点根据当前就绪队列中的进程和正在运行的进程依据特定算法决定下一步该做什么是继续执行当前进程还是切换到另一个进程提示在模拟中“时间”通常是离散的整数时间单位。一个时间单位可以代表实际的1毫秒、1微秒或者仅仅是一个逻辑上的步进。这简化了我们的模型便于跟踪和调试。2. 先来先服务FCFS与短作业优先SJF算法实现先来先服务FCFS是最直观、最简单的调度算法。它的核心思想就像一个超市的普通收银台谁先来排队谁就先得到服务。在代码中这意味着我们的就绪队列就是一个简单的先进先出FIFO队列。新到达的进程被添加到队尾调度器总是从队头选取进程投入运行并且一旦开始运行就会一直运行到该进程完成除非它主动放弃CPU比如进行I/O操作。这种“非抢占”的特性使得实现非常直接。让我们看看FCFS的核心调度循环是如何工作的import queue from typing import List def fcfs_scheduler(processes: List[Process]) - List[Process]: FCFS调度算法模拟 返回按调度顺序排列的进程列表并更新每个进程的统计信息。 # 按到达时间排序模拟进程依次到达 processes.sort(keylambda p: p.arrival_time) ready_queue queue.Queue() current_time 0 scheduled_order [] i 0 # 用于遍历尚未到达的进程 while i len(processes) or not ready_queue.empty(): # 将当前时间及之前到达的进程加入就绪队列 while i len(processes) and processes[i].arrival_time current_time: ready_queue.put(processes[i]) processes[i].state ProcessState.READY i 1 if not ready_queue.empty(): # 从队头取出进程运行 current_process ready_queue.get() current_process.state ProcessState.RUNNING current_process.start_time current_time if current_process.start_time is None else current_process.start_time # 进程一直运行直到完成 current_time current_process.burst_time current_process.finish_time current_time current_process.state ProcessState.TERMINATED scheduled_order.append(current_process) else: # CPU空闲时间推进到下一个进程到达的时间 current_time processes[i].arrival_time return scheduled_orderFCFS的优点是公平且实现简单但它的缺点也很明显护航效应Convoy Effect。如果一个长作业最先到达后面所有的短作业都必须等待它完成导致平均等待时间变得很长。这在交互式系统中体验极差。为了优化平均等待时间短作业优先SJF算法应运而生。SJF的核心思想是优先运行预计运行时间最短的作业。这就像银行开设了“快速业务”窗口办理时间短的客户可以优先。在非抢占式SJF中调度器从就绪队列中选择剩余运行时间最短的进程投入运行并让其运行至完成。实现SJF的关键在于就绪队列不再是一个简单的FIFO队列而需要根据进程的剩余运行时间动态排序。我们可以使用一个优先队列最小堆来实现import heapq def sjf_scheduler_nonpreemptive(processes: List[Process]) - List[Process]: 非抢占式短作业优先调度算法模拟。 就绪队列使用最小堆以剩余运行时间为键。 processes.sort(keylambda p: p.arrival_time) # 最小堆元素为 (剩余时间, 到达时间, 进程对象) ready_heap [] current_time 0 scheduled_order [] i 0 while i len(processes) or ready_heap: # 将所有已到达的进程加入最小堆 while i len(processes) and processes[i].arrival_time current_time: heapq.heappush(ready_heap, (processes[i].burst_time, processes[i].arrival_time, processes[i])) processes[i].state ProcessState.READY i 1 if ready_heap: # 弹出剩余时间最短的进程 burst, arr, current_process heapq.heappop(ready_heap) current_process.state ProcessState.RUNNING current_process.start_time current_time if current_process.start_time is None else current_process.start_time current_time current_process.burst_time current_process.finish_time current_time current_process.state ProcessState.TERMINATED scheduled_order.append(current_process) else: current_time processes[i].arrival_time return scheduled_orderSJF算法在数学上可以被证明是平均等待时间最小的非抢占式调度算法。但它有一个致命的弱点饥饿Starvation。如果不断有短作业到达那么长作业可能永远得不到运行。此外作业的运行时间在实际系统中往往是难以精确预知的这限制了SJF的直接应用。3. 抢占式调度与时间片轮转RR算法交互式系统如我们日常使用的桌面操作系统要求系统对用户的操作有及时的响应。非抢占式调度算法无法满足这个需求因为一个长时间运行的计算任务会阻塞整个系统。因此抢占式调度成为必须。在抢占式调度中操作系统可以在任何时刻中断当前正在运行的进程将CPU分配给另一个更紧急或更合适的进程。最常见的抢占式调度算法就是时间片轮转Round Robin, RR。RR算法给每个进程分配一个固定的CPU时间单元称为时间片Time Quantum。进程被放入一个循环队列中调度器依次让每个进程运行一个时间片。如果进程在一个时间片内完成了它就离开系统如果没完成它会被放回就绪队列的末尾等待下一轮调度。这就像在食堂打饭每个人轮流打一勺打完一勺如果还没饱就重新排队。实现RR算法我们需要处理几个关键细节时间片的管理每个进程运行一段时间后必须被强制中断。进程状态的保存与恢复中断后需要保存进程的剩余运行时间以便下次接着运行。新进程的到达在模拟时钟推进过程中需要及时将新到达的进程加入就绪队列。下面是一个RR算法的详细实现我们使用collections.deque双端队列作为就绪队列因为它支持高效的从左侧弹出和从右侧追加操作from collections import deque def round_robin_scheduler(processes: List[Process], time_quantum: int) - List[Process]: 时间片轮转调度算法模拟。 :param time_quantum: 时间片大小 processes.sort(keylambda p: p.arrival_time) ready_queue deque() # 就绪队列 current_time 0 scheduled_order [] # 记录调度顺序用于分析和可视化 i 0 # 初始化所有进程的剩余时间 for p in processes: p.remaining_time p.burst_time while i len(processes) or ready_queue: # 将当前时间及之前到达的进程加入就绪队列 while i len(processes) and processes[i].arrival_time current_time: ready_queue.append(processes[i]) processes[i].state ProcessState.READY i 1 if not ready_queue: # CPU空闲跳到下一个进程到达的时间 current_time processes[i].arrival_time continue # 从队头取出一个进程运行 current_process ready_queue.popleft() if current_process.start_time is None: current_process.start_time current_time current_process.state ProcessState.RUNNING # 决定本次运行的时间取时间片和剩余时间的最小值 run_time min(time_quantum, current_process.remaining_time) current_process.remaining_time - run_time current_time run_time # 将在此期间新到达的进程加入队尾 while i len(processes) and processes[i].arrival_time current_time: ready_queue.append(processes[i]) processes[i].state ProcessState.READY i 1 # 检查当前进程是否完成 if current_process.remaining_time 0: current_process.finish_time current_time current_process.state ProcessState.TERMINATED scheduled_order.append(current_process) else: # 未完成放回就绪队列队尾 current_process.state ProcessState.READY ready_queue.append(current_process) return scheduled_order时间片大小的选择对RR算法的性能有巨大影响。为了直观展示这种影响我们可以用同一组进程使用不同的时间片来模拟并对比关键指标时间片大小平均周转时间平均等待时间上下文切换次数1 (过小)较高很高非常多4 (适中)较低较低中等10 (较大)接近FCFS接近FCFS少∞ (无限大)等于FCFS等于FCFS0从上表可以看出时间片太小会导致频繁的进程切换上下文切换系统开销增大真正用于运行用户程序的时间比例下降时间片太大则退化为FCFS交互性变差。一个经验法则是时间片应略大于一次典型交互所需的时间例如80%的进程能在该时间片内完成这样能在响应时间和吞吐量之间取得较好的平衡。4. 优先级调度与多级反馈队列MLFQ的融合实现纯粹的优先级调度算法为每个进程赋予一个优先级调度器总是选择优先级最高的进程运行。它可以是抢占式的当更高优先级进程到达时立即抢占CPU或非抢占式的。优先级调度的问题在于低优先级进程可能产生饥饿。一个常见的优化是使用动态优先级即进程的优先级可以随着时间变化。例如一个进程等待CPU的时间越长就逐步提高其优先级这被称为“老化Aging”技术。然而现代通用操作系统如Linux、Windows广泛使用的是一种更为复杂和智能的混合算法——多级反馈队列Multilevel Feedback Queue, MLFQ。MLFQ巧妙地融合了多种算法的思想其核心目标是在不预先知道进程行为是CPU密集型还是I/O密集型的情况下同时优化周转时间和响应时间。MLFQ的规则可以概括如下设置多个就绪队列每个队列拥有不同的优先级和不同的时间片大小。通常优先级越高的队列时间片越小例如最高优先级队列时间片为8ms下一级为16ms再下一级为32ms以此类推。新进程进入最高优先级队列。调度规则总是优先运行高优先级队列中的进程。仅当高优先级队列为空时才运行低优先级队列中的进程。时间片规则进程在某个队列中被分配一个时间片。如果它在该时间片内用完之前主动放弃CPU例如进行I/O操作则其优先级不变或可能提升并留在原队列。如果它用完了整个时间片说明它是CPU密集型则其优先级被降低移入下一级队列。周期性地提升所有进程的优先级例如每隔一段时间将所有进程移回最高优先级队列以防止饥饿。下面我们来实现一个简化但功能完整的MLFQ模拟器。我们将使用一个列表来管理多个队列每个队列是一个deque。class MultilevelFeedbackQueueScheduler: def __init__(self, num_queues: int 3, base_time_quantum: int 2): 初始化多级反馈队列调度器。 :param num_queues: 队列数量 :param base_time_quantum: 最高优先级队列的时间片基数下级队列时间片加倍。 self.num_queues num_queues self.queues [deque() for _ in range(num_queues)] self.time_quanta [base_time_quantum * (2 ** i) for i in range(num_queues)] # 记录进程当前所在的队列级别 self.process_queue_level {} self.current_time 0 self.scheduled_order [] def schedule(self, processes: List[Process]) - List[Process]: 执行MLFQ调度 processes.sort(keylambda p: p.arrival_time) for p in processes: p.remaining_time p.burst_time self.process_queue_level[p.pid] 0 # 新进程从队列0开始 i 0 # 进程列表索引 completed 0 total_processes len(processes) while completed total_processes: # 1. 将到达的进程加入最高优先级队列队列0 while i len(processes) and processes[i].arrival_time self.current_time: p processes[i] self.queues[0].append(p) p.state ProcessState.READY i 1 # 2. 从高到低查找第一个非空队列 current_queue_idx None for q_idx in range(self.num_queues): if self.queues[q_idx]: current_queue_idx q_idx break if current_queue_idx is None: # 所有队列为空时间跳到下一个进程到达 if i len(processes): self.current_time processes[i].arrival_time continue # 3. 从选中的队列头部取出进程运行 current_process self.queues[current_queue_idx].popleft() if current_process.start_time is None: current_process.start_time self.current_time current_process.state ProcessState.RUNNING # 4. 确定运行时间取当前队列时间片和进程剩余时间的最小值 time_slice self.time_quanta[current_queue_idx] run_time min(time_slice, current_process.remaining_time) current_process.remaining_time - run_time self.current_time run_time # 5. 处理在此期间新到达的进程 while i len(processes) and processes[i].arrival_time self.current_time: p processes[i] self.queues[0].append(p) p.state ProcessState.READY self.process_queue_level[p.pid] 0 i 1 # 6. 判断进程后续处理 if current_process.remaining_time 0: # 进程完成 current_process.finish_time self.current_time current_process.state ProcessState.TERMINATED self.scheduled_order.append(current_process) completed 1 # 从级别字典中移除 self.process_queue_level.pop(current_process.pid, None) else: # 进程未完成需要降级或留在原队列 # 规则如果用完了整个时间片则降级否则因I/O等主动放弃留在原队列。 # 在我们的简化模拟中假设所有进程都是CPU密集型用尽时间片则降级。 # 更复杂的模拟可以引入I/O事件。 if run_time time_slice: # 用尽时间片降级到下一队列如果存在 next_level min(current_queue_idx 1, self.num_queues - 1) self.queues[next_level].append(current_process) self.process_queue_level[current_process.pid] next_level else: # 主动放弃模拟情况放回原队列末尾 self.queues[current_queue_idx].append(current_process) current_process.state ProcessState.READY return self.scheduled_order这个MLFQ实现体现了其核心优势对交互型短作业友好它们通常在最高优先级队列的小时间片内就能完成从而获得极快的响应。对批处理长作业公平长作业会逐渐下沉到低优先级队列虽然响应变慢但能获得更长的时间片减少了上下文切换开销最终也能完成。能自适应进程行为通过观察进程是频繁放弃CPU可能是I/O密集型还是霸占CPUCPU密集型来动态调整其优先级。为了更直观地对比这几种算法的表现我们可以设计一组测试进程并用不同的调度算法运行最后输出关键性能指标进行比较。def evaluate_scheduler(scheduler_func, processes, **kwargs): 评估调度算法的性能 # 创建进程的深拷贝避免不同算法间相互影响 import copy test_processes copy.deepcopy(processes) result scheduler_func(test_processes, **kwargs) if kwargs else scheduler_func(test_processes) total_turnaround sum(p.turnaround_time for p in result) total_waiting sum(p.waiting_time for p in result) avg_turnaround total_turnaround / len(result) avg_waiting total_waiting / len(result) print(f算法: {scheduler_func.__name__}) print(f 平均周转时间: {avg_turnaround:.2f}) print(f 平均等待时间: {avg_waiting:.2f}) # 可以打印详细的进程时间线 for p in result: print(f PID {p.pid}: 到达 {p.arrival_time}, 运行 {p.burst_time}, 开始 {p.start_time}, 完成 {p.finish_time}, 周转 {p.turnaround_time}, 等待 {p.waiting_time}) print(- * 40) return avg_turnaround, avg_waiting # 测试用例 test_processes [ Process(pid1, arrival_time0, burst_time8), Process(pid2, arrival_time1, burst_time4), Process(pid3, arrival_time2, burst_time9), Process(pid4, arrival_time3, burst_time5), ] print( 调度算法性能对比 ) evaluate_scheduler(fcfs_scheduler, test_processes) evaluate_scheduler(sjf_scheduler_nonpreemptive, test_processes) evaluate_scheduler(round_robin_scheduler, test_processes, time_quantum4) mlfq MultilevelFeedbackQueueScheduler(num_queues3, base_time_quantum2) # 注意evaluate_scheduler需要适配MLFQ的类调用方式这里略作调整 test_procs_copy copy.deepcopy(test_processes) mlfq.schedule(test_procs_copy) # 计算MLFQ的结果...通过这样的实战编码和对比你不仅能记住各种算法的名字更能理解它们在不同场景下的权衡。你会发现没有一种算法是完美的操作系统的设计正是在这些权衡中寻找最适合当前硬件和应用场景的平衡点。MLFQ之所以成功正是因为它通过一套简单的规则实现了对未知负载类型的良好适应。自己动手实现一遍遇到边界条件时仔细思考如何处理比如时间片用完时进程恰好完成、就绪队列为空但还有进程未到达等情况这些细节的打磨会让你对操作系统的理解远超纸上谈兵。