怎样做电影网站房地产建设企业网站
怎样做电影网站,房地产建设企业网站,台州外包加工网,如何做电商1. 环境准备#xff1a;给你的Ubuntu 20.04“体检”
在开始动手安装Kafka之前#xff0c;我们得先确保你的Ubuntu 20.04系统是“健康”的#xff0c;并且准备好了所有必要的“营养品”。这就像你要组装一台电脑#xff0c;得先确认主板、电源、CPU都到位了#xff0c;而且…1. 环境准备给你的Ubuntu 20.04“体检”在开始动手安装Kafka之前我们得先确保你的Ubuntu 20.04系统是“健康”的并且准备好了所有必要的“营养品”。这就像你要组装一台电脑得先确认主板、电源、CPU都到位了而且兼容。对于Kafka来说它最核心的依赖就是Java运行环境因为Kafka本身就是用Scala和Java写的。另外虽然Kafka新版本2.8.0以后内置了KRaft模式可以不用ZooKeeper但为了稳定和兼容性以及大多数生产环境的现状我们这次还是采用经典的“Kafka ZooKeeper”组合。别担心跟着我的步骤来一点都不复杂。首先打开你的终端。我习惯用快捷键Ctrl Alt T你也可以在应用菜单里找。第一步更新一下系统的软件包列表这是一个好习惯能确保我们安装的是最新、最稳定的版本。sudo apt update更新完列表我们来安装Java。Kafka需要Java 8或更高版本。Ubuntu 20.04的默认仓库里提供了OpenJDK 11这是一个非常稳定且免费的选择。运行下面这条命令sudo apt install -y openjdk-11-jdk安装完成后验证一下Java是否安装成功以及版本是否正确java -version如果终端里显示了类似“openjdk version “11.0.xx””的信息那就恭喜你第一步成功了。接下来是ZooKeeper。ZooKeeper是Kafka用来管理集群元数据、控制器选举等的“大管家”必须得先启动它。我们可以直接用Kafka包里自带的ZooKeeper这样版本最匹配也最省事。所以我们暂时不需要单独安装ZooKeeper等下载完Kafka直接用里面的脚本启动就行。至此你的系统“体检”就通过了我们可以进入下一个环节下载和解压Kafka。2. 下载与安装Kafka获取核心“引擎”现在我们要把Kafka这个“引擎”下载到本地。我推荐直接去Apache Kafka的官网下载这样最安全版本也最新。我们这次选用Kafka 3.6.0版本写这篇文章时的稳定版它兼容Scala 2.12。你可以用浏览器打开下载页面但我更习惯在终端里用wget命令直接下载这样更快而且不用切换窗口。首先找一个你喜欢的目录来存放Kafka。我一般会在用户主目录下创建一个software或者apps文件夹专门放这些中间件。这里我以主目录为例cd ~ mkdir -p software cd software然后使用wget命令下载Kafka的压缩包wget https://dlcdn.apache.org/kafka/3.6.0/kafka_2.12-3.6.0.tgz如果下载速度慢可以尝试换成国内的镜像源比如清华大学的镜像把链接里的dlcdn.apache.org换成mirrors.tuna.tsinghua.edu.cn/apache/kafka。下载完成后你会看到一个.tgz的压缩文件。接下来就是解压它并给它改个简单好记的名字。# 解压压缩包 tar -zxvf kafka_2.12-3.6.0.tgz # 将解压后的文件夹重命名为 kafka方便后续操作 mv kafka_2.12-3.6.0 kafka # 进入kafka目录看看 cd kafka pwd这个pwd命令会打印出当前目录的绝对路径比如/home/你的用户名/software/kafka。请记下这个路径我们等下修改配置文件时会用到。现在Kafka的所有文件都已经静静地躺在你的硬盘里了但还不能直接跑我们需要对它进行一些“个性化设置”也就是修改配置文件。3. 关键配置调整让Kafka“听话”地跑起来Kafka的默认配置是为通用场景设计的但在我们自己的机器上跑尤其是学习环境最好根据实际情况调整几个关键参数。这样能避免一些常见问题比如日志把磁盘塞满或者外部程序连接不上。配置文件就在我们刚才进入的kafka/config目录下核心文件是server.properties。我们用文本编辑器打开它比如vim或者nano。我个人习惯用nano对新手更友好。nano config/server.properties打开文件后你会看到很多配置项别被吓到我们只需要关注和修改其中几个。用键盘方向键移动光标找到需要修改的地方。第一数据保存策略。Kafka默认会永久保存所有消息这对于学习或测试环境来说很容易积累大量无用数据占用磁盘空间。我们需要设置数据的保留时间和大小。 找到log.retention.hours和log.retention.bytes如果没有就自己添加。我建议这样设置log.retention.hours48 log.retention.bytes1073741824这表示消息最多保存48小时或者日志文件总大小超过1GB1073741824字节后旧数据就会被清理。你可以根据自己磁盘空间调整。第二日志存储路径。默认日志存在/tmp目录下这个目录在系统重启后可能会被清理导致数据丢失。我们应该把它改到一个固定的位置。找到log.dirs修改为你想要的路径。比如就放在我们kafka目录下新建一个kafka-logs文件夹log.dirs/home/你的用户名/software/kafka/kafka-logs请务必将“你的用户名”替换成你实际的用户名或者使用你之前pwd命令看到的kafka根目录路径。第三也是新手最容易踩坑的地方监听器Listeners配置。这个配置决定了Kafka如何被访问。默认配置可能只允许本机localhost访问。如果你以后想用其他机器上的工具比如后面会提到的Kafka Assistant来连接或者在本机用IP地址访问就需要修改这里。找到listeners和advertised.listeners修改为listenersPLAINTEXT://0.0.0.0:9092 advertised.listenersPLAINTEXT://你的机器内网IP:9092listenersPLAINTEXT://0.0.0.0:9092表示Kafka会在所有网络接口上监听9092端口。advertised.listeners是告诉客户端应该连接哪个地址。这里填你的Ubuntu虚拟机的内网IP地址你可以用hostname -I命令查看。如果你只在本机测试用127.0.0.1或localhost也可以。修改完成后按CtrlX然后按Y再按回车保存并退出nano编辑器。4. 启动服务与验证点燃“引擎”看状态配置搞定现在可以启动我们的服务了。记住顺序先启动“大管家”ZooKeeper再启动Kafka本身。我们使用Kafka自带的脚本来启动并且让它们在后台运行这样我们就能继续使用当前终端窗口。第一步启动ZooKeeper。bin/zookeeper-server-start.sh config/zookeeper.properties 命令最后的符号表示在后台运行。执行后你会看到一长串日志输出不用慌这是ZooKeeper在正常启动。只要最后没有明显的错误ERROR信息并且终端回到了命令输入提示符就基本没问题。第二步启动Kafka Broker。bin/kafka-server-start.sh config/server.properties 同样等待它输出一阵日志回到命令行提示符。现在两个服务都应该在后台运行了。怎么验证呢最直接的方式是查看进程。# 查看ZooKeeper进程 ps -ef | grep zookeeper # 查看Kafka进程 ps -ef | grep kafka如果能看到带有zookeeper.properties和server.properties的Java进程那就说明启动成功了。有时候你可能需要等上几秒钟让服务完全初始化。如果启动失败最常见的原因就是端口被占用比如之前启动过没关掉或者Java环境有问题。可以回头去检查一下server.properties的配置是否有拼写错误路径是否正确。还有一个更专业的检查方法就是查看Kafka的日志。日志文件默认在我们刚才配置的log.dirs目录下或者你也可以直接看启动时终端输出的日志末尾有没有错误。第一次启动时Kafka会进行一些元数据初始化稍微慢一点是正常的。看到两个服务都成功运行后那种感觉就像第一次成功点亮了自己组装的电脑主机非常有成就感。接下来我们就可以真正地“玩”一下Kafka了。5. 基础功能实战创建主题、发送与接收消息服务跑起来了现在我们来试试Kafka最核心的功能生产消息和消费消息。但在那之前我们需要先创建一个“主题”。你可以把主题理解成一个分类或者一个消息队列的名字生产者和消费者都需要指定同一个主题才能通信。我们用Kafka自带的命令行工具来操作这些工具都在bin/目录下。创建主题我们创建一个名为test-topic的主题为了学习我们给它设置2个分区和1个副本因子。分区是Kafka实现高并发和水平扩展的关键一个主题可以分成多个分区分布在不同的Broker上。副本因子是为了数据安全1表示只有一份数据没有副本对于单机学习环境足够了。bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic test-topic --partitions 2 --replication-factor 1执行后如果看到“Created topic test-topic.”的提示就成功了。这里--bootstrap-server参数指定了要连接的Kafka服务器地址和端口就是我们配置的localhost:9092。查看已有主题bin/kafka-topics.sh --list --bootstrap-server localhost:9092你应该能看到刚刚创建的test-topic在列表中。发送消息启动一个生产者现在我们来模拟一个应用程序向test-topic发送消息。下面这个命令会启动一个控制台生产者它等待你在终端里输入内容每输入一行文字并按回车就发送一条消息。bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic命令执行后光标会停在新的一行等待你输入。你可以试着输入几条消息比如Hello Kafka! This is my first message. Learning Kafka on Ubuntu 20.04 is fun.每输入完一行按回车发送。你可以多输入几条输完后先不要关闭这个终端窗口按CtrlC可以停止生产者。接收消息启动一个消费者再打开一个新的终端窗口CtrlAltT再开一个切换到Kafka目录然后启动一个控制台消费者让它从test-topic的最开始读取消息。cd ~/software/kafka bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning神奇的事情发生了你刚才在生产者终端里输入的所有消息是不是一条一条地在消费者终端里打印出来了--from-beginning参数告诉消费者“给我从这个主题最早的消息开始读。” 如果不加这个参数消费者默认只会接收启动后新产生的消息。现在你可以回到生产者的终端继续输入新消息比如Another new message!然后观察消费者的终端它会几乎实时地显示出这条新消息。这就是Kafka最基本的发布/订阅模式。你可以多开几个消费者终端试试看看消息是如何被分发的。这就是Kafka的核心魅力所在解耦、异步、可靠的消息传递。通过这几个简单的命令你已经完成了消息从生产到消费的完整流程是不是比想象中简单6. 使用图形化工具管理KafkaKafka Assistant初体验命令行虽然强大但有个可视化工具来查看和管理Kafka的各类信息对于理解和调试会更直观。这里我推荐一个叫Kafka Assistant的客户端工具类似Kafka Tool、Offset Explorer。它不是Kafka官方出品但界面友好功能对于初学者来说足够用了。你可以去它的官网下载适合你操作系统的版本比如Windows、macOS、Linux。这里假设你已经在另一台Windows电脑上下载并安装了它。打开Kafka Assistant通常第一步是创建一个新的连接。你需要填写几个关键信息连接名称 随便起比如 “My Ubuntu Kafka”。集群地址 这里要填你Ubuntu虚拟机的IP地址和端口。如果你在配置server.properties时advertised.listeners写的是IP比如192.168.1.100那就填192.168.1.100:9092。如果写的是localhost那这个工具必须安装在Ubuntu本机上才能连接。其他选项 一般保持默认即可我们用的是PLAINTEXT协议没有开启SSL或SASL认证。点击连接如果一切配置正确你应该能成功连接到你的Kafka单节点“集群”。连接成功后工具界面通常会分为左右两栏。左边是树形结构展示集群、Broker、主题Topics、消费者组Consumer Groups等。查看主题详情在左边找到并点击test-topic右边可能会显示这个主题的详细信息包括分区数Partitions: 2、副本因子、以及每个分区的Leader副本在哪个Broker上应该都是Broker ID 0。这比命令行查看要直观得多。查看消息很多工具都提供了直接浏览主题内消息的功能。你可能会找到一个“查看消息”或“Browse Messages”的按钮。点击它选择test-topic你就能看到之前通过命令行发送的所有消息包括键、值、偏移量、时间戳等信息。你可以清晰地看到消息是如何分布在不同分区里的。查看消费者组我们之前用控制台消费者消费消息时Kafka会自动生成一个消费者组。在工具的“消费者组”列表里你应该能看到一个名字类似console-consumer-开头的组。点进去可以看到这个组的成员、每个分区当前的消费偏移量等信息。这对于监控消费进度非常有帮助。使用图形化工具的最大好处是“一目了然”。你不需要记住复杂的命令行参数就能快速了解集群状态、主题结构、消息堆积情况。这对于学习阶段建立对Kafka的直观认知以及后续开发调试都是一个很好的补充。当然生产环境下的运维可能更依赖脚本和监控系统但有一个趁手的GUI工具总是让人心情愉悦。7. 集成开发环境测试在Idea里玩转Kafka作为开发者我们最终是要在代码里使用Kafka的。这里我用最流行的Java IDE——IntelliJ IDEA为例带你快速走一遍在Java项目中生产和消费消息的流程。即使你不用Java这个思路也是相通的。第一步创建项目并引入依赖。创建一个新的Maven或Gradle项目。在项目的依赖管理文件pom.xml或build.gradle中添加Kafka客户端的依赖。以Maven为例dependency groupIdorg.apache.kafka/groupId artifactIdkafka-clients/artifactId version3.6.0/version !-- 版本号尽量与服务器一致 -- /dependency第二步编写生产者代码。创建一个Java类比如SimpleProducer。核心是配置Properties对象然后创建KafkaProducer。import org.apache.kafka.clients.producer.*; import java.util.Properties; public class SimpleProducer { public static void main(String[] args) { // 1. 配置生产者参数 Properties props new Properties(); props.put(bootstrap.servers, 192.168.1.100:9092); // 你的Kafka地址 props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 可选设置acksall是最可靠的 props.put(acks, all); // 2. 创建生产者实例 try (ProducerString, String producer new KafkaProducer(props)) { // 3. 构造并发送消息 for (int i 0; i 10; i) { String key key- i; String value Message value from Idea - i; ProducerRecordString, String record new ProducerRecord(test-topic, key, value); // 发送消息并设置回调函数异步 producer.send(record, (metadata, exception) - { if (exception null) { System.out.println(消息发送成功主题: metadata.topic() , 分区: metadata.partition() , 偏移量: metadata.offset()); } else { exception.printStackTrace(); } }); } // 确保所有消息都发送完成 producer.flush(); System.out.println(所有消息已发送完毕。); } } }运行这段代码如果控制台打印出消息发送成功的日志并且你在Kafka Assistant或者之前启动的控制台消费者里能看到这些新消息那就说明生产者配置成功了。第三步编写消费者代码。再创建一个SimpleConsumer类。import org.apache.kafka.clients.consumer.*; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args) { // 1. 配置消费者参数 Properties props new Properties(); props.put(bootstrap.servers, 192.168.1.100:9092); props.put(group.id, idea-consumer-group); // 消费者组名可以自定义 props.put(key.deserializer, org.apache.kafka.common.serialization.StringDeserializer); props.put(value.deserializer, org.apache.kafka.common.serialization.StringDeserializer); // 从最早的消息开始消费仅第一次时有效 props.put(auto.offset.reset, earliest); // 是否自动提交偏移量 props.put(enable.auto.commit, true); // 2. 创建消费者实例并订阅主题 try (ConsumerString, String consumer new KafkaConsumer(props)) { consumer.subscribe(Collections.singletonList(test-topic)); // 3. 循环拉取消息 while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { System.out.printf(收到消息 - 分区: %d, 偏移量: %d, 键: %s, 值: %s%n, record.partition(), record.offset(), record.key(), record.value()); } } } } }运行消费者代码它会持续运行并打印出从test-topic消费到的消息。你可以看到每条消息属于哪个分区、偏移量是多少。通过这个简单的例子你就能理解如何在真实的业务代码中集成Kafka了。在实际项目中你可能需要更精细地控制偏移量提交、处理消费异常、配合Spring框架等但最核心的Producer和ConsumerAPI的使用方式就是这样的。自己动手敲一遍代码比看十遍教程都管用。