从MYSQL批量迁移数据到OceanBase
大约 2 分钟
目录结构
- mig.sh
- template.json
- als
- env.txt
- tables.txt
mig.sh
#!/bin/bash
if [ -z "$1" ]; then
echo '请输入需要导入的目录地址!'
exit
fi
dir=$(cd $(dirname $0); pwd)
cd $dir
# 检查env.txt文件是否存在
ls $1 | grep env.txt | grep -V grep
if [ $? -ne 0 ]; then
echo '指定目录下没有env.txt文件,请检查!'
exit
fi
# 检查tables.txt文件是否存在
ls $1 | grep tables.txt | grep -V grep
if [ $? -ne 0 ]; then
echo '指定目录下没有tables.txt文件,请检查!'
exit
fi
DATAX_HOME='/home'
# 设置需要进行模板替换的变量
source $1/env.txt
buildDir=$1/build
logDir=$1/log
rm -rf $buildDir $logDir
mkdir $buildDir $logDir
# 读取需要其按哦已的数据表集合,进行模板变量替换
while read table
do
file_text=$(< ./template.json)
eval "cat <<EOF
$file_text
EOF" > $buildDir/$table.json
done < $1/tables.txt
# ---------------------------多线程处理---------------------------
# 线程数量
threads=10
# mkfifo 创建有名管道
myfifo="/tmp/migfd"
rm -rf $myfifo
mkfifo $myfifo
# 创建文件描述符,以可读(<)和可写(>)关联管道文件
exec 2<>$myfifo
rm -rf $myfifo
# 为文件描述符创建占位信息
for ((i=0;i<${threads};i++))
do
echo >&2
done
# ---------------------------多线程处理---------------------------
# 根据变量startTable判断是否需要跳过开头的某些表,用于断点续跑
skip=1
if [ -z "$startTable" ]; then
skip=0
fi
# 开始表数据迁移
for table in `cat $1/tables.txt`
do
# 跳过已经迁移成功的表
if [ $skip -eq 1 ]; then
if [ "$table" == "$startTable" ]; then
skip=0
else
echo "---------------------------跳过表:${table}---------------------------"
fi
fi
read -u2
{
echo "---------------------------开始迁移表:${table}---------------------------"
python $DATAX_HOME/bin/datax.py $buildDir/$table.json 2>&1 > $logDir/$table.log
if [ $? -ne 0 ]; then
echo "---------------------------迁移表:${table}失败!---------------------------"
fi
echo "---------------------------迁移表:${table}结束---------------------------"
echo >&2
}&
done
wait
# 关闭文件描述符的读写
exec 2<&-
exec 2>&-
template.json
{
"core": {
"transport": {
"channel": {
"speed": {
"byte": 10485760
}
}
}
},
"job": {
"settings": {
"speed": {
"channel": 4,
"byte": 52428800
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "${read_username}",
"password": "${read_password}",
"column": ["*"],
"connection": [
{
"table": ["${table}"],
"jdbcUrl": [
"${read_url}"
]
}
]
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "insert",
"column": ["*"],
"preSql": ["truncate table ${table}"],
"connection": [
{
"table": ["${table}"],
"jdbcUrl": "${write_url}"
}
],
"username": "${write_username}",
"password": "${write_password}",
"writeThreadCount": 10,
"batchSize": 1000,
"memstoreThreshold": 0.9
}
}
}
]
}
}
env.txt
# 源数据库jdbc URL
read_url=jdbc:mysql://20.15.71.68:3306/als93cdev?useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8
# 源数据库登录用户名
read_username=root
# 源数据库登录密码
read_password=amarsoft
# 目标数据库jdbc URL
write_url=jdbc:oceanbase://20.14.133.11:2883/CREDIT?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true
# 目标数据库登录用户名
write_username=CREDIT@dev_crcsdb#testcluster
# 目标数据库登录密码
write_password=CREDIT@@abcd1234
# 从哪张表开始迁移,用于断点续跑
startTable=
tables.txt
business_apply
customer_info