2021-10-31
resultMap
#
- 当返回结果时(未显示指定resultmap),mybatis会创建一个ResultMap,将属性名映射到实体类的属性上,如果列名和属性名不能匹配时,可以在select语句中设置别名来完成匹配,如
user_id as "id"
。
属性
#
- id 唯一标识,别的地方引用
- type 类的完全限定命名,或者类型别名。
- autoMapping 如果设置这个属性,mybatis将会为本结果映射开启或关闭自动映射。这个属性会覆盖全局的autoMappingBehavior。
元素
#
-
constructor
用于在实例化类时,注入结果到构造方法中
idArg
ID参数;标记出作为ID的结果可以帮助提高整体性能
arg
将被注入到构造方法的一个普通结果
-
id
标记作为ID的结果可以帮助提高整体性能
-
result
注入到实体类的普通结果
-
association
关联一个复杂的类型,即数据库的一对一。
-
collection
数据库的一对多
-
discriminator
使用结果值来决定使用哪个resultMap
-
id和result唯一的不同是,id元素对应的属性会被标记为对象的标识符,在比较对象实例时使用,能提高整体的性能。
id&result
#
property
映射到列的属性,如果实体类有这个名字的property,会先使用该属性。
column
数据库中的列名,或者是列的别名。
javaType
一个Java类的全限定命名,或者一个类型别名。如果映射到实体类,mybatis通常可以推断类型。如果映射到HashMap,则应该明确指定javaType来保证类型一致。
jdbcType
JDBC类型,只有可为空的列上能指定JDBC类型。
typeHandler
指定类型处理器,需要是全限定命名或者类型别名。
association
#
<resultMap id="blogResult" type="Blog">
<id property="id" column="blog_id" />
<result property="title" column="blog_title"/>
<!-- 这里的author指的是Blog中的属性 -->
<association property="author" column="blog_author_id" javaType="Author" resultMap="authorResult"/>
</resultMap>
<resultMap id="authorResult" type="Author">
<id property="id" column="author_id"/>
<result property="username" column="author_username"/>
<result property="password" column="author_password"/>
</resultMap>
<!-- 或者使用下面的形式-->
<resultMap id="blogResult" type="Blog">
<id property="id" column="blog_id" />
<result property="title" column="blog_title"/>
<association property="author" javaType="Author">
<id property="id" column="author_id"/>
<result property="username" column="author_username"/>
<result property="password" column="author_password"/>
</association>
</resultMap>
<select id="selectBlog" resultMap="blogResult">
select
B.id as blog_id,
B.title as blog_title,
B.author_id as blog_author_id,
A.id as author_id,
A.username as author_username,
A.password as author_password,
from Blog B left outer join Author A on B.author_id = A.id
where B.id = #{id,javaType}
</select>
collection
#
<resultMap id="blogResult" type="Blog">
<id property="id" column="blog_id" />
<result property="title" column="blog_title"/>
<collection property="posts" ofType="Post" resultMap="blogPostResult" columnPrefix="post_"/>
</resultMap>
<resultMap id="blogPostResult" type="Post">
<id property="id" column="id"/>
<result property="subject" column="subject"/>
<result property="body" column="body"/>
</resultMap>
<!-- 或者使用下面这种形式 -->
<resultMap id="blogResult" type="Blog">
<id property="id" column="blog_id" />
<result property="title" column="blog_title"/>
<collection property="posts" ofType="Post">
<id property="id" column="post_id"/>
<result property="subject" column="post_subject"/>
<result property="body" column="post_body"/>
</collection>
</resultMap>
<select id="selectBlog" resultMap="blogResult">
select
B.id as blog_id,
B.title as blog_title,
B.author_id as blog_author_id,
P.id as post_id,
P.subject as post_subject,
P.body as post_body,
from Blog B
left outer join Post P on B.id = P.blog_id
where B.id = #{id}
</select>
2021-10-28
#服务注册:
curl -X POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?serviceName=xiaoxiang&ip=192.168.1.2&port=8080'
#服务发现:
curl -X GET 'http://127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=nacos.naming.serviceName'
#发布配置:
curl -X POST "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test&content=HelloWorld"
#获取配置:
curl -X GET "http://127.0.0.1:8848/nacos/v1/cs/configs?dataId=nacos.cfg.dataId&group=test"
一个nacos简单案例,之后会接着补充的:
nacos: naocs-demon (gitee.com)
2021-10-27
-
agent就是一个Flume的实例,event是flume定义的一个数据流传输的最小单元。source指的是数据的来源和方式,channel是一个数据的缓冲池,sink定义了数据的输出的方式和目的地。一个完整的agent包含了必须的三个组件source、channel和sink。
-
启动./bin/flume-ng agent -n agent_name -c conf -f conf/flume-conf.properties.template
。
- agent 表示启动一个agent,-n 指定agent的名字,-f配置文件的位置 ,-c 配置文件夹的位置。
-
一个配置文件的例子
#设置各个组件的名称
a1.sources=r1
a1.sinks=k1
a1.channels=c1
#监听的主机和端口
a1.sources.r1.type=syslogudp
al.sources.r1.bind=10.2.4.31
a1.sources.r1.port=5145
a1.sources.r1.keepFields=true
#配置Sink
a1.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers=10.2.4.31:9092
a1.sinks.k1.kafka.topic=rawlog
#配置channel
a1.channels.c1.type=memory
#内存channel的容量大小为1000
a1.channels.c1.capacity=1000
#source和sink从内存每次传输的event数量
a1.channels.c1.transactionCapacity=1000
#把source绑定到channel上,一个source可以连接多个channel
a1.sources.r1.channels=c1
#把sink绑定到channel上,一个sink只能连接一个channel
a1.sinks.k1.channel=c1
#启动
#bin/flume-ng agent -c conf -f example.conf --name a1 -Dflume.root.logger=INFO,console
- 通过linux命令发送一条log
logger --udp --port 5140 --server 172.27.32.2 "hello world"
2021-10-21
介绍
#
- kafka中的消息以主题为单位进行归类,生产者负责将消息发送到特定的topic,消费者负责订阅topic并进行消费。
- topic是一个逻辑上的概念,它可以细分为多个partition(分区),一个partition只属于单个topic(主题)。
- 同一个topic下的不同的partition存放的消息是不同的。消息在被追加到分区文件中会被分配一个特定的偏移量(offset)。offset是消息在partition中唯一的标识,kafka通过offset来保证消息在分区内的顺序性。offset并不跨越分区,也就是说kafka保证的是partition有序而非topic有序。
- partition可以分布在不同的broker上,也就是说一个topic可以横跨多个broker。
- 每一条消息被发送到broker前,会根据分区规则选择存到哪个具体的分区,如果分区规则设置的合理,则所有的消息都可以均匀的分配到不同的分区中。如果一个topic只有一个分区,那么这个这个分区文件所在的机器的I/O将会成为这个topic的性能瓶颈。
- kafka为分区引入了副本(replica)机制,同一个分区的不同副本中保存的是相同的消息。副本之间是一主多从的关系,leader副本负责处理读写请求,follower副本只负责于leader副本的消息同步,当leader副本出现故障时,从follower中重新选举的新leader对外提供服务。
日志存储
#
- kafka中的消息是以topic为基本单位进行归类的,各个topic逻辑上相互独立,每个topic可以分为一个或多个partition,分区的数量可以在topic创建时指定,也可以在之后修改。每个消息在发送的时候会根据分区规则被追加到指定的partition中,partition中的每条消息都会被分配一个唯一的序列号,即偏移量(offset)。
- 不考虑多副本的情况,一个分区对应一个日志(Log)。为了防止Log过大,kafka引入了日志分段(LogSegment)的概念,将Log切分为多个LogSegment,相当于一个大型文件被平均分为多个相对较小的文件。Log在物理上以文件夹的形式存储,每隔LogSegment对应于磁盘上的一个日志文件和两个索引文件以及可能的其他文件。
- Log对应的命名形式为
topic-partition
的目录。
- 为了便于消息的检索,每隔LogSegment中的日志文件(以
.log
文件结尾)都有对应的两个索引文件:偏移量索引文件(.index
)和时间戳索引文件(.timeindex
),每个logsegment都有一个基准偏移量baseoffset,用来表示当前logsegment中第一条消息的offset,偏移量是一个64位长整型数,日志文件和两个索引文件都是根据准偏移量来命名的,名称固定为20个数字,没有达到的位数则用0填充。
- kafka消费者的唯一是保存在内部topic
__consumer_offsets
中的。
消费者
#
- 一个消费者只属于一个消费组,每一个分区只能被一个消费组中的一个消费者所消费。
命令行
#
#启动服务
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
#创建有4个partition的topic
kafka-topics.sh --bootstrap-server 10.2.4.31:9092 --create --topic kuangpf-topic --partitions 4 [--replication-factor 1] [--if-not-exists]
#展示topic分区副本的分配细节
kafka-topics.sh --bootstrap-server 10.2.4.31:9092 --describe --topic kuangpf-topic
#向topic中写入event
bin/kafka-console-producer.sh --topic test --bootstrap-server localhost:9092
#读取events
bin/kafka-console-consumer.sh --topic test --from-beginning --bootstrap-server localhost:9092
#获取当前可用的topics
kafka-topics.sh --bootstrap-server 10.2.4.31:9092 --list [--if-exists]
#增加分区数,分区数只能增加不能减少
kafka-topics.sh --bootstrap-server 10.2.4.31:9092 --alter --topic kuangpf-topic --partitions 5
#查看动态配置
kafka-configs.sh --bootstrap-server 10.2.4.31:9092 --describe --entity-type topics --entity-name kuangpf-topic
#添加配置,多个配置用逗号隔开
kafka-configs.sh --bootstrap-server 10.2.4.31:9092 --alter --entity-type topics --entity-name kuangpf-topic --add-config max.message.bytes=10000
#删除某个动态配置
kafka-configs.sh --bootstrap-server 10.2.4.31:9092 --alter --entity-type topics --entity-name kuangpf-topic --delete-config max.message.bytes
#删除一个topic
kafka-topics.sh --bootstrap-server 10.2.4.31:9092 --delete --topic kuangpf-topic [--if-exists]
#将消费组的topic的偏移量移到最后
kafka-consumer-groups.sh --bootstrap-server 10.2.4.31:9092 --group log-process --topic a36cde73-88e0-4398-b7ee-7246f65c6cfe --reset-offsets --to-latest --execute
#性能测试工具
#发送100万条数据,每条消息1024字节,吞吐量大于-时表示当发送的吞吐量大于该值时就会被阻塞一段时间,小于0则不限流
#records sent表示测试时发送的消息总数,records/sec和MB、sec表示吞吐量,avg latency表示消息处理的平均耗时,max latency表示消息的最大耗时。50th表示50%的消息处理耗时,其他的类推。
kafka-producer-perf-test.sh --topic k1 --num-records 100000 --record-size 1024 --throughput -1 --producer-props bootstrap.servers=10.2.4.31:9092
#消费100万条数据
#start.time起始时间、end.time结束时间、data.consumed.in.MB消费的消息总量、每秒拉去的消息的字节大小fetch.MB.sec、fetch.nMsg.sec每秒拉去的消息个数
kafka-consumer-perf-test.sh --topic k1 --messages 1000000 --broker-list 10.2.4.31:9092
java程序案例
#
public static void main(String[] args) {
Properties props = new Properties();
//给程序一个唯一标识符,用于区分和同一个kafka对话的其他程序。
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
//kafka的地址
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
//使用topology构建器构建一个topology
final StreamsBuilder builder = new StreamsBuilder();
//创建源流,KStream会不断地从my-input中获取数据
KStream<String, String> source = builder.stream("my-input");
//将输入的键和值随意更改,类型也可以修改,返回新的键值对
source.map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
log.info("====> key {}, value{}", key, value);
return new KeyValue<>(key, value);
}
});
//将数据写入到my-output,TopicNameExtractor可以动态指定topic,但topic必须存在
source.to(new TopicNameExtractor<String, String>() {
@Override
public String extract(String key, String value, RecordContext recordContext) {
log.info("---->key {}, value {},recordContext {}", key, value, recordContext);
return "my-output";
}
});
final Topology topology = builder.build();
//使用上面的两个组件topology和props来构建streams客户端。
final KafkaStreams streams = new KafkaStreams(topology, props);
//程序结束前能关闭连接
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
}
});
//执行此客户端
streams.start();
}
2021-09-27
psql --username highgo --host 127.0.0.1 --port 5866 --password
登录到数据库
- -U (–username), -h(–host), -p(–port), -W(–password)
#关闭三权分立,能解决很多的权限问题,生产环境不建议关闭
#以syssso登录,修改后重启,默认为on,可以改为off
select set_secure_param('hg_sepofpowers','off');
#修改postgresql.conf
listen_addresses = '0.0.0.0'
#修改pg_hba.conf,加上
host all all 0.0.0.0/0 sm3
postgresql
#
# 配置步骤
adduser postgres
mkdir /usr/local/pgsql/data
chown postgres /usr/local/pgsql/data
su - postgres
/usr/local/pgsql/bin/initdb -D /usr/local/pgsql/data
/usr/local/pgsql/bin/postgres -D /usr/local/pgsql/data >logfile 2>&1 &
/usr/local/pgsql/bin/createdb test
/usr/local/pgsql/bin/pgsql test
#启动psql
postgres -D /usr/local/pgsql/data >logfile 2>&1 &
pg_ctl start -l logfile
#psql登录后
\dn 查看模式
\l 查看数据库
\dt myschema.* 列出table
\dg 列出所有角色
#密码永不失效 valid until 'infinity';
create user davide with createdb password '1234!@#$QWERqwer' valid until '2005-01-01';
alter role davide with password '1234qwer!';
#设置到期时间
alter role syssso valid until 'infinity';
#创建数据库,并指定拥有者
CREATE DATABASE sales [OWNER salesapp];
#创建数据库
create database koal_database
#创建一个模式
CREATE SCHEMA myschema;
#将数据库的权限分给别人
grant all privileges on database koal_database to koal;
#移除权限
revoke all privileges on database koal_database from koal;
#导出一个数据库
pg_dump mydb > db.sql
#导入一个脚本到新建的数据库newdb
psql -d newdb -f db.sql
- psql有些不属于sql的内部命令,它们以反斜线开头,获取帮助
\h
,退出命令\q
。