DBInputFormat para transferir dados de SQL para banco de dados NoSQL



O objetivo deste blog é aprender como transferir dados de bancos de dados SQL para HDFS, como transferir dados de bancos de dados SQL para bancos de dados NoSQL.

Neste blog, exploraremos os recursos e possibilidades de um dos componentes mais importantes da tecnologia Hadoop, ou seja, MapReduce.

Hoje, as empresas estão adotando a estrutura do Hadoop como sua primeira escolha para armazenamento de dados por causa de seus recursos para lidar com grandes dados de forma eficaz. Mas também sabemos que os dados são versáteis e existem em várias estruturas e formatos. Para controlar uma variedade tão grande de dados e seus diferentes formatos, deve haver um mecanismo para acomodar todas as variedades e ainda produzir um resultado eficaz e consistente.





O componente mais poderoso na estrutura do Hadoop é o MapReduce, que pode fornecer o controle sobre os dados e sua estrutura melhor do que suas outras contrapartes. Embora exija sobrecarga de curva de aprendizado e a complexidade de programação, se você pode lidar com essas complexidades, certamente pode lidar com qualquer tipo de dados com o Hadoop.

A estrutura MapReduce divide todas as suas tarefas de processamento em basicamente duas fases: Mapear e Reduzir.



Preparar seus dados brutos para essas fases requer a compreensão de algumas classes e interfaces básicas. A superclasse para esse reprocessamento é Formato de entrada.

o Formato de entrada class é uma das classes principais na API Hadoop MapReduce. Esta classe é responsável por definir duas coisas principais:

  • Divisões de dados
  • Leitor de registro

Divisão de dados é um conceito fundamental na estrutura Hadoop MapReduce que define o tamanho das tarefas de mapa individuais e seu servidor de execução potencial. o Leitor de registro é responsável por ler registros reais do arquivo de entrada e enviá-los (como pares de chave / valor) ao mapeador.



O número de mapeadores é decidido com base no número de divisões. É função do InputFormat criar as divisões. Na maioria das vezes, o tamanho da divisão é equivalente ao tamanho do bloco, mas nem sempre as divisões serão criadas com base no tamanho do bloco HDFS. Depende totalmente de como o método getSplits () de seu InputFormat foi substituído.

Há uma diferença fundamental entre a divisão MR e o bloco HDFS. Um bloco é um pedaço físico de dados, enquanto uma divisão é apenas um pedaço lógico que um mapeador lê. Uma divisão não contém os dados de entrada, apenas uma referência ou endereço dos dados. Uma divisão tem basicamente duas coisas: um comprimento em bytes e um conjunto de locais de armazenamento, que são apenas strings.

Para entender isso melhor, vamos dar um exemplo: Processamento de dados armazenados em seu MySQL usando MR. Como não há conceito de blocos neste caso, a teoria: “as divisões são sempre criadas com base no bloco HDFS”,falha. Uma possibilidade é criar divisões com base em intervalos de linhas em sua tabela MySQL (e isso é o que DBInputFormat faz, um formato de entrada para ler dados de bancos de dados relacionais). Podemos ter k número de divisões consistindo em n linhas.

É apenas para InputFormats baseados em FileInputFormat (um InputFormat para lidar com dados armazenados em arquivos) que as divisões são criadas com base no tamanho total, em bytes, dos arquivos de entrada. No entanto, o tamanho de bloco do FileSystem dos arquivos de entrada é tratado como um limite superior para divisões de entrada. Se você tiver um arquivo menor que o tamanho do bloco HDFS, obterá apenas 1 mapeador para esse arquivo. Se você quiser ter um comportamento diferente, pode usar mapred.min.split.size. Mas, novamente, depende exclusivamente do getSplits () do seu InputFormat.

Temos tantos formatos de entrada pré-existentes disponíveis no pacote org.apache.hadoop.mapreduce.lib.input.

como fazer uma série de objetos

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

o que é .format em python

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

O padrão é TextInputFormat.

Da mesma forma, temos tantos formatos de saída que lêem os dados dos redutores e os armazenam no HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

O padrão é TextOutputFormat.

Quando terminar de ler este blog, você terá aprendido:

  • Como escrever um programa de redução de mapa
  • Sobre os diferentes tipos de InputFormats disponíveis no Mapreduce
  • Qual é a necessidade de InputFormats
  • Como escrever InputFormats personalizados
  • Como transferir dados de bancos de dados SQL para HDFS
  • Como transferir dados de bancos de dados SQL (aqui MySQL) para bancos de dados NoSQL (aqui Hbase)
  • Como transferir dados de um banco de dados SQL para outra tabela em bancos de dados SQL (Talvez isso não seja tão importante se fizermos isso no mesmo banco de dados SQL. No entanto, não há nada de errado em ter um conhecimento do mesmo. Você nunca sabe como pode entrar em uso)

Pré-requisito:

  • Hadoop pré-instalado
  • SQL pré-instalado
  • Hbase pré-instalado
  • Conhecimento básico de Java
  • Conhecimento MapReduce
  • Conhecimento básico da estrutura Hadoop

Vamos entender a declaração do problema que vamos resolver aqui:

arquitetura mvc em java com diagrama

Temos uma tabela de funcionários no banco de dados MySQL em nosso banco de dados relacional Edureka. Agora, de acordo com os requisitos de negócios, temos que mudar todos os dados disponíveis no banco de dados relacional para o sistema de arquivos Hadoop, ou seja, HDFS, banco de dados NoSQL conhecido como Hbase.

Temos muitas opções para fazer esta tarefa:

  • Sqoop
  • Flume
  • MapReduce

Agora, você não deseja instalar e configurar nenhuma outra ferramenta para esta operação. Você fica com apenas uma opção que é a estrutura de processamento MapReduce do Hadoop. A estrutura MapReduce forneceria controle total sobre os dados durante a transferência. Você pode manipular as colunas e colocá-las diretamente em qualquer um dos dois locais de destino.

Nota:

  • Precisamos baixar e colocar o conector MySQL no classpath do Hadoop para buscar tabelas da tabela MySQL. Para fazer isso, baixe o conector com.mysql.jdbc_5.1.5.jar e mantenha-o no diretório Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Além disso, coloque todos os jars Hbase sob o classpath do Hadoop para fazer seu programa MR acessar o Hbase. Para fazer isso, execute o seguinte comando :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

As versões de software que usei na execução desta tarefa são:

  • Hadooop-2.3.0
  • HBase 0.98.9-Hadoop2
  • Lua Eclipse

Para evitar o programa em qualquer problema de compatibilidade, prescrevo aos meus leitores que executem o comando em ambiente semelhante.

DBInputWritable personalizado:

pacote com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBInputWritable implementa Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException // O objeto Resultset representa os dados retornados de uma instrução SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) throws IOException { } public void write (PreparedStatement ps) lança SQLException {ps.setInt (1, id) ps.setString (2, nome) ps.setString (3, dept)} public int getId () {return id} public String getName () {nome de retorno} public String getDept () {departamento de retorno}}

DBOutputWritable personalizado:

pacote com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.io .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable public class DBOutputWritable implementa Writable, DBWritable {private String name private int id private String depto public DBOutputWritable (String name, int id, String depto) {this.name = name this.id = id this.dept = dept} public void readFields (DataInput in) throws IOException {} public void readFields (ResultSet rs) throws SQLException {} public void write (DataOutput out) throws IOException {} public void write (PreparedStatement ps) lança SQLException {ps.setString (1, nome) ps.setInt (2, id) ps.setString (3, dept)}}

Tabela de entrada:

criar banco de dados edureka
criar tabela emp (empid int not null, name varchar (30), dept varchar (20), chave primária (empid))
inserir em valores emp (1, 'abhay', 'desenvolvimento'), (2, 'brundesh', 'teste')
selecione * de emp

Caso 1: transferência do MySQL para HDFS

pacote com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable public class MainDbtohdfs {public static void main (String [] args) lança Exceção {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // classe do driver' jdbc: mysql: // localhost: 3306 / edureka ', // url do banco de dados' root ', // nome do usuário' root ') // senha Job job = new Job (conf) job .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.class) FileOut (DBInputFormat.class) FileOut new Path (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nome da tabela de entrada nulo, nulo, new String [] {'empid', 'name', 'dept'} / / colunas da tabela) Caminho p = novo Caminho (args [0]) FileSystem fs = FileSystem.get (novo URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Este código nos permite preparar ou configurar o inputformat para acessar nosso banco de dados SQL de origem. O parâmetro inclui a classe do driver, a URL tem o endereço do banco de dados SQL, seu nome de usuário e a senha.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe do driver 'jdbc: mysql: // localhost: 3306 / edureka', // url do db 'root', // nome do usuário 'root') //senha

Este trecho de código nos permite passar os detalhes das tabelas do banco de dados e configurá-los no objeto de trabalho. Os parâmetros incluem, é claro, a instância do job, a classe gravável customizada que deve implementar a interface DBWritable, o nome da tabela de origem, a condição se houver alguma outra nula, quaisquer parâmetros de classificação senão nula, a lista de colunas da tabela respectivamente.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nome da tabela de entrada nulo, nulo, novo String [] {'empid', 'nome', 'dept'} // colunas da tabela)

Mapeador

pacote com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable public class Map extends Mapper {
mapa de void protegido (chave LongWritable, valor DBInputWritable, ctx do contexto) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (novo texto (nome + '' + id + '' + departamento), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Redutor: redutor de identidade usado

Comando para executar:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Resultado: tabela MySQL transferida para HDFS

hadoop dfs -ls / dbtohdfs / *

Caso 2: transferência de uma tabela no MySQL para outra no MySQL

criando tabela de saída no MySQL

criar a tabela funcionário1 (nome varchar (20), id int, dept varchar (20))

pacote com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable public class Mainonetable_to_other_table {public static void main (String [] args) lança Exceção {Configuração conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe de driver 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // nome de usuário' root ') // senha Job job = novo Job (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) job .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (Nul lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nome da tabela de entrada null, null, new String [] {'empid ',' nome ',' dept '} // colunas da tabela) DBOutputFormat.setOutput (trabalho,' funcionário1 ', // nome da tabela de saída novo String [] {' nome ',' id ',' departamento '} // tabela colunas) System.exit (job.waitForCompletion (true)? 0: 1)}}

Este código nos permite configurar o nome da tabela de saída no banco de dados SQL. Os parâmetros são instância do trabalho, nome da tabela de saída e nomes da coluna de saída, respectivamente.

DBOutputFormat.setOutput (job, 'employee1', // nome da tabela de saída new String [] {'name', 'id', 'dept'} // colunas da tabela)

Mapeador: igual ao caso 1

Redutor:

pacote com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable public class Reduce extends Reducer {protected void reduce (Text key, Iterable values, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (linha [0] .toString (), Integer.parseInt (linha [1] .toString ()), linha [2] .toString ()), NullWritable.get ())} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Comando para executar:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Saída: dados transferidos da tabela EMP no MySQL para outra tabela Employee1 no MySQL

Caso 3: transferência da tabela no MySQL para a tabela NoSQL (Hbase)

Criando a tabela Hbase para acomodar a saída da tabela SQL:

criar 'funcionário', 'oficial_info'

Classe do motorista:

pacote Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text public class MainDbToHbase {public static void main (String [] args) lança Exceção {Configuration conf = HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // classe de driver 'jdbc: mysql: // localhost: 3306 / edureka' , // url do banco de dados 'root', // nome do usuário 'root') // senha Job job = novo Job (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('Employee', Reduce.class, job) jobFsetInput.classFormatput (DBBBBBBBytesWritable.class) jobFsetInput.class) TableMapReduceUtil. classe) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // nome da tabela de entrada nulo, nulo, novo String [] {'empid', 'nome', 'dept'} // colunas da tabela) System.exit (job.waitForCompletion (true)? 0: 1)}}

Este pedaço de código permite que você configure a classe da chave de saída que no caso de hbase é ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Aqui estamos passando o nome da tabela hbase e o redutor para atuar na tabela.

TableMapReduceUtil.initTableReducerJob ('funcionário', Reduce.class, trabalho)

Mapeador:

pacote Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable public class Map extends Mapper {private IntWritable one = new IntWritable (1) protected void map (LongWritable id, valor DBInputWritable, contexto contextual) {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Neste pedaço de código, estamos pegando valores dos getters da classe DBinputwritable e, em seguida, passando-os
ImmutableBytesWritable para que alcancem o redutor na forma bytewriatble que Hbase entende.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), new Text (line + '' + dept ))

Redutor:

pacote Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text public class Reduce extends TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) throws IOException, InterruptedException {String [] cause = null // Valores de loop for (Text val: values) {cause = val.toString (). split ('')} // Colocar no HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info' ), Bytes.toBytes ('nome'), Bytes.toBytes (causa [0])) put.add (Bytes.toBytes ('informação_oficial'), Bytes.toBytes ('departamento'), Bytes.toBytes (causa [1 ])) context.write (key, put)}}

Esse trecho de código nos permite decidir a linha e a coluna exatas nas quais armazenaremos os valores do redutor. Aqui, estamos armazenando cada empid em linhas separadas, pois tornamos empid uma chave de linha que seria única. Em cada linha, armazenamos as informações oficiais dos funcionários no grupo de colunas “official_info” nas colunas “nome” e “departamento”, respectivamente.

Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (cause [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('departamento'), Bytes.toBytes (causa [1])) context.write (key, put)

Dados transferidos em Hbase:

escanear empregado

Como podemos ver, fomos capazes de concluir a tarefa de migrar nossos dados de negócios de um banco de dados SQL relacional para um banco de dados NoSQL com sucesso.

No próximo blog, aprenderemos como escrever e executar códigos para outros formatos de entrada e saída.

Continue postando seus comentários, perguntas ou qualquer feedback. Eu adoraria ouvir de você.

Tem alguma questão para nós? Mencione isso na seção de comentários e entraremos em contato com você.

Postagens relacionadas: