博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka与.net core(二)zookeeper
阅读量:5268 次
发布时间:2019-06-14

本文共 6682 字,大约阅读时间需要 22 分钟。

1.zookeeper简单介绍

1.1作用

zookeeper的作用是存储kafka的服务器信息,topic信息,和cunsumer信息。如下图:

而zookeeper是个什么东西呢?简单来说就是一个具有通知机制的文件系统,引用网路上的一张图

可以看出来zookeeper是一个树形的文件结构,我们可以自定义node与node的值,并对node进行监视,当node的结构或者值变化时,我们可以收到通知。

1.2node类型

1)PERSISTENT-持久化目录节点
客户端与zookeeper断开连接后,该节点依旧存在
2)PERSISTENT_SEQUENTIAL-持久化顺序编号目录节点
客户端与zookeeper断开连接后,该节点依旧存在,只是Zookeeper给该节点名称进行顺序编号
3)EPHEMERAL-临时目录节点
客户端与zookeeper断开连接后,该节点被删除
4)EPHEMERAL_SEQUENTIAL-临时顺序编号目录节点
客户端与zookeeper断开连接后,该节点被删除,只是Zookeeper给该节点名称进行顺序编号

2.zookeeper命令操作

连接zookeeper

[root@iz2zei2y693gtrgwlibzlwz ~]# zkCli.sh -server ip:2181

查看zookeeper的所有节点

ls /

查看某个节点的子节点

ls /brokers

创建节点

create /testaa dataaaa

获取节点的值

get /testaa

设置节点值

set /testaa aaabbb

删除节点

delete /testaa

这么看来实际上zookeeper跟数据库类似也是CURD操作,我们再来看看zookeeper的安全控制ACL

3.zookeeper的ACL

3.1ZK的节点有5种操作权限:

CREATE、READ、WRITE、DELETE、ADMIN 也就是 增、删、改、查、管理权限,这5种权限简写为crwda(即:每个单词的首字符缩写)

3.2身份的认证有4种方式:

world:默认方式,相当于全世界都能访问
auth:代表已经认证通过的用户(cli中可以通过addauth digest user:pwd 来添加当前上下文中的授权用户)
digest:即用户名:密码这种方式认证,这也是业务系统中最常用的
ip:使用Ip地址认证

3.3ACL实例

默认创建的时world方式,任何人都能访问,我们新建一个测试node节点
[zk: 3:2181(CONNECTED) 11] create /cys cys

访问一一下

[zk: 3:2181(CONNECTED) 13] get /cys

查看一下Acl

[zk: 3:2181(CONNECTED) 15] getAcl /cys

下面我们设置一下他的用户

 命令为:

1)增加一个认证用户
addauth digest 用户名:密码明文
eg. addauth digest user1:password1
2)设置权限
setAcl /path auth:用户名:密码明文:权限
eg. setAcl /test auth:user1:password1:cdrwa
3)查看Acl设置
getAcl /path

 具体操作如下:

addauth digest cys:123456setAcl /cys auth:cys:123456:crwda

我们ctrl+c退出zkCli,重新连接一下,然后查询

get /cys

结果如下:

提示我们认证失败,我们登陆一下

addauth digest cys:123456

结果如下:

我们在查看一下/cys节点的Acl

 

可以看出来用户cys对应的密码(加密后的)和权限cdrwa

4..net core 操作

4.1新建server项目,引入ZookeeperNetEx这个nuget包

Server端代码

using org.apache.zookeeper;using org.apache.zookeeper.data;using System;using System.Collections.Generic;using System.Text;using System.Threading.Tasks;namespace ConsoleApp3{    class Program    {        static void Main(string[] args)        {            while (true)            {                Console.WriteLine("输入path");                var path = Console.ReadLine();                string address = "39.**.**.**:2181";                ZooKeeper _zooKeeper = new ZooKeeper(address, 1000 * 1000, null);                ZooKeeper.States states = _zooKeeper.getState();                //是否存在                Task
stat = _zooKeeper.existsAsync(path); stat.Wait(); if (stat.Result != null && stat.Status.ToString().ToLower() == "RanToCompletion".ToLower()) { //已存在 Console.WriteLine($"{path}已存在"); } else { Console.WriteLine("输入data"); var data = Console.ReadLine(); //创建 Task
task = _zooKeeper.createAsync(path, System.Text.Encoding.UTF8.GetBytes(data), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); task.Wait(); if (!string.IsNullOrEmpty(task.Result) && task.Status.ToString().ToLower() == "RanToCompletion".ToLower()) { Console.WriteLine($"{path}创建成功"); } } Console.WriteLine("输入set data"); var dataA = Console.ReadLine(); //set值 Task
statA = _zooKeeper.setDataAsync(path, System.Text.Encoding.UTF8.GetBytes(dataA)); statA.Wait(); if (statA.Result != null && statA.Status.ToString().ToLower() == "RanToCompletion".ToLower()) { Console.WriteLine("set 成功"); } Console.WriteLine("输入子path"); var childpath = Console.ReadLine(); Console.WriteLine("输入子data"); var childdata = Console.ReadLine(); Task
childtask = _zooKeeper.createAsync(childpath, System.Text.Encoding.UTF8.GetBytes(childdata), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); childtask.Wait(); if (!string.IsNullOrEmpty(childtask.Result) && childtask.Status.ToString().ToLower() == "RanToCompletion".ToLower()) { Console.WriteLine($"{childpath}创建成功"); } Console.ReadLine(); _zooKeeper.closeAsync().Wait(); } ////删除 //Console.WriteLine("输入delete path"); //var pathB = Console.ReadLine(); //Task taskA = _zooKeeper.deleteAsync(pathB); //taskA.Wait(); //if (taskA.Status.ToString().ToLower() == "RanToCompletion".ToLower()) //{ // Console.WriteLine("delete 成功"); //} ////获取数据 //Task
dataResult = _zooKeeper.getDataAsync(path, new NodeWatcher()); //dataResult.Wait(); //if (dataResult.Result != null && dataResult.Status.ToString().ToLower() == "RanToCompletion".ToLower()) //{ // Console.WriteLine(Encoding.UTF8.GetString(dataResult.Result.Data)); //} } }}

4.2新建client项目,引入ZookeeperNetEx这个nuget包

客户端代码

using org.apache.zookeeper;using org.apache.zookeeper.data;using System;using System.Collections.Generic;using System.Threading;using System.Threading.Tasks;namespace Client{    class Program    {        static void Main(string[] args)        {            Console.WriteLine("输入path");            var path = Console.ReadLine();            string address = "39.**.**.**:2181";            ZooKeeper _zooKeeper = new ZooKeeper(address, 10 * 1000, new DefaultWatcher());        //用户登陆            _zooKeeper.addAuthInfo("digest", System.Text.Encoding.Default.GetBytes("cys:123456"));            //获取child            var getresult = _zooKeeper.getChildrenAsync(path, true);            getresult.Wait();            //获取数据            Task
dataResult = _zooKeeper.getDataAsync(path,true); dataResult.Wait(); Thread.Sleep(100000); } } public class DefaultWatcher : Watcher { internal static readonly Task CompletedTask = Task.FromResult(1);        ///
        /// 接收通知        ///         ///
        ///
        public override Task process(WatchedEvent @event) { Console.WriteLine(string.Format("接收到ZooKeeper服务端的通知,State是:{0},EventType是:{1},Path是:{2}", @event.getState(), @event.get_Type(), @event.getPath() ?? string.Empty)); return CompletedTask; } }}

这样当server端操作的时候,client端会通过watcher收到通知

转载于:https://www.cnblogs.com/chenyishi/p/10249731.html

你可能感兴趣的文章
easyUI定区关联快递员js代码
查看>>
c#的dllimport使用方法详解,调试找不到dll的方法
查看>>
创建一个自定义的Application类
查看>>
UDP的最大报文长度
查看>>
Ubuntu 16.04使用chrome闪屏
查看>>
Android必学-异步加载+Android自定义View源码【申明:来源于网络】
查看>>
行政区划代码
查看>>
自定义不等高的cell-(storyboard)
查看>>
Cracking the code interview
查看>>
linux命令 rpm
查看>>
OMG: daily scrum nine
查看>>
【蓝桥杯】历届试题 连号区间数(运行超时)
查看>>
交换机练习的心得
查看>>
JavaScript数组学习总结
查看>>
node.js
查看>>
配置 Squid Server
查看>>
PHP学习笔记之批量删除
查看>>
jQuery的弹出窗口插件colorbox
查看>>
第17章 Redis概述
查看>>
MyBatis的关联映射和动态SQL
查看>>