RDD usando Spark: o bloco de construção do Apache Spark



Este blog sobre RDD usando Spark fornecerá a você um conhecimento detalhado e abrangente de RDD, que é a unidade fundamental do Spark e como ele é útil.

, A palavra em si é suficiente para gerar uma centelha na mente de cada engenheiro do Hadoop. PARA n na memória ferramenta de processamento que é muito rápido na computação em cluster. Comparado ao MapReduce, o compartilhamento de dados na memória torna RDDs 10-100x Mais rápido do que o compartilhamento de rede e disco e tudo isso é possível por causa dos RDDs (conjuntos de dados distribuídos resilientes). Os pontos principais que enfocamos hoje neste artigo de RDD usando Spark são:

Precisa de RDDs?

Por que precisamos de RDD? -RDD usando Spark





O mundo está evoluindo com e Ciência de Dados por causa do avanço em . Algoritmos baseado em Regressão , , e que corre em Distribuído Computação Iterativa ação moda que inclui a reutilização e compartilhamento de dados entre várias unidades de computação.

como usar goto c ++

O tradicional técnicas necessárias um armazenamento intermediário e distribuído estável como HDFS compreendendo cálculos repetitivos com replicação de dados e serialização de dados, o que tornava o processo muito mais lento. Encontrar uma solução nunca foi fácil.



Aqui é onde RDDs (Conjuntos de dados distribuídos resilientes) vem para o quadro geral.

RDD s são fáceis de usar e fáceis de criar, pois os dados são importados de fontes de dados e colocados em RDDs. Além disso, as operações são aplicadas para processá-los. Eles são um coleção distribuída de memória com permissões como Somente leitura e o mais importante, eles são Tolerante a falhas .



Caso existam partição de dados do o RDD é perdido , pode ser regenerado aplicando o mesmo transformação operação naquela partição perdida em linhagem , em vez de processar todos os dados do zero. Esse tipo de abordagem em cenários de tempo real pode fazer milagres acontecerem em situações de perda de dados ou quando um sistema estiver inativo.

O que são RDDs?

RDD ou ( Conjunto de dados distribuídos resilientes ) é fundamental estrutura de dados no Spark. O termo Resiliente define a capacidade que gera os dados automaticamente ou dados rolando para trás ao Estado original quando ocorre uma calamidade inesperada com probabilidade de perda de dados.

Os dados gravados em RDDs são particionado e armazenado em vários nós executáveis . Se um nó em execução falha no tempo de execução, em seguida, ele instantaneamente obtém o backup do próximo nó executável . É por isso que os RDDs são considerados um tipo avançado de estruturas de dados quando comparados a outras estruturas de dados tradicionais. Os RDDs podem armazenar dados estruturados, não estruturados e semiestruturados.

Vamos seguir em frente com nosso RDD usando o blog do Spark e aprender sobre os recursos exclusivos dos RDDs, que oferecem uma vantagem sobre outros tipos de estruturas de dados.

Características do RDD

  • Em memória (RAM) Computações : O conceito de computação In-Memory leva o processamento de dados a um estágio mais rápido e eficiente onde o desempenho do sistema é atualizado.
  • eu sua avaliação : O termo avaliação preguiçosa diz que o transformações são aplicados aos dados em RDD, mas a saída não é gerada. Em vez disso, as transformações aplicadas são registrado.
  • Persistência : Os RDDs resultantes são sempre reutilizável.
  • Operações de grão grosso : O usuário pode aplicar transformações a todos os elementos em conjuntos de dados por meio de mapa, filtro ou agrupar por operações.
  • Tolerante a falhas : Se houver perda de dados, o sistema pode rolar para trás para o seu Estado original usando o logado transformações .
  • Imutabilidade : Dados definidos, recuperados ou criados não podem ser mudou assim que estiver conectado ao sistema. No caso de você precisar acessar e modificar o RDD existente, você deve criar um novo RDD aplicando um conjunto de Transformação funções no RDD atual ou anterior.
  • Particionamento : É o unidade crucial de paralelismo no Spark RDD. Por padrão, o número de partições criadas é baseado em sua fonte de dados. Você pode até decidir o número de partições que deseja fazer usando partição customizada funções.

Criação de RDD usando Spark

RDDs podem ser criados em três caminhos:

  1. Lendo dados de coleções paralelizadas
val PCRDD = spark.sparkContext.parallelize (Array ('Seg', 'Ter', 'Qua', 'Qui', 'Sex', 'Sáb'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Aplicando transformação em RDDs anteriores
val words = spark.sparkContext.parallelize (Seq ('Spark', 'is', 'a', 'very', 'poderoso', 'language')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Lendo dados de armazenamento externo ou caminhos de arquivo como HDFS ou HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operações realizadas em RDDs:

Existem basicamente dois tipos de operações que são realizadas em RDDs, a saber:

  • Transformações
  • Ações

Transformações : o operações aplicamos em RDDs para filtro, acesso e modificar os dados no RDD pai para gerar um RDD sucessivo é chamado transformação . O novo RDD retorna um ponteiro para o RDD anterior garantindo a dependência entre eles.

Transformações são Avaliações preguiçosas, em outras palavras, as operações aplicadas no RDD que você está trabalhando serão registradas, mas não executado. O sistema lança um resultado ou exceção após acionar o Açao .

Podemos dividir as transformações em dois tipos, conforme abaixo:

  • Transformações estreitas
  • Transformações amplas

Transformações estreitas Aplicamos transformações estreitas em um partição única do RDD pai para gerar um novo RDD, pois os dados necessários para processar o RDD estão disponíveis em uma única partição do pai ASD . Os exemplos de transformações estreitas são:

  • mapa()
  • filtro()
  • flatMap ()
  • partição ()
  • mapPartitions ()

Transformações amplas: Nós aplicamos a ampla transformação em múltiplas partições para gerar um novo RDD. Os dados necessários para processar o RDD estão disponíveis em várias partições do pai ASD . Os exemplos para amplas transformações são:

  • Reduzir por()
  • União()

Ações : Ações instruem o Apache Spark a aplicar computação e passe o resultado ou uma exceção de volta ao driver RDD. Algumas das ações incluem:

  • coletar ()
  • contagem()
  • levar()
  • primeiro()

Vamos aplicar de forma prática as operações em RDDs:

IPL (Indian Premier League) é um torneio de críquete com o seu nível de pico. Então, vamos hoje colocar as mãos no conjunto de dados IPL e executar nosso RDD usando Spark.

  • Primeiramente, vamos baixar um arquivo CSV de dados de IPL. Depois de baixá-lo, ele começa a parecer um arquivo EXCEL com linhas e colunas.

Na próxima etapa, acendemos a centelha e carregamos o arquivo match.csv de seu local, no meu caso meucsvlocalização do arquivo é “/User/edureka_566977/test/matches.csv”

como usar spyder python

Agora vamos começar com o Transformação parte primeiro:

  • mapa():

Nós usamos Transformação do mapa para aplicar uma operação de transformação específica em cada elemento de um RDD. Aqui, criamos um RDD com o nome CKfile onde armazenamos nossocsvArquivo. Devemos criar outro RDD chamado Estados para armazene os detalhes da cidade .

spark2-shell val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val states = CKfile.map (_. split (',') (2)) states.collect (). foreach (println)

  • filtro():

Transformação de filtro, o próprio nome descreve seu uso. Usamos essa operação de transformação para filtrar os dados seletivos de uma coleção de dados fornecidos. Nós aplicamos operação de filtro aqui para obter os recordes dos jogos IPL do ano 2017 e armazene-o no arquivo RDD.

val fil = CKfile.filter (line => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Aplicamos flatMap é uma operação de transformação para cada um dos elementos de um RDD para criar um novoRDD. É semelhante à transformação do mapa. aqui nós aplicamosFlatmappara cuspir os fósforos da cidade de Hyderabad e armazenar os dados emfilRDDRDD.

val filRDD = fil.flatMap (line => line.split ('Hyderabad')). collect ()

  • partição ():

Todos os dados que gravamos em um RDD são divididos em um determinado número de partições. Usamos essa transformação para encontrar o número de partições os dados são realmente divididos em.

val fil = CKfile.filter (line => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

Consideramos MapPatitions como uma alternativa de Map () epara cada() juntos. Usamos mapPartitions aqui para encontrar o numero de linhas temos em nosso arquivo RDD.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • Reduzir por():

Nós usamosReduzir por() em Pares de valor-chave . Usamos essa transformação em nossocsvarquivo para encontrar o jogador com o o melhor homem dos jogos .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

  • União():

O nome explica tudo. Usamos a transformação sindical para Club dois RDDs juntos . Aqui estamos criando dois RDDs, ou seja, fil e fil2. fil RDD contém os registros de correspondências de IPL de 2017 e fil2 RDD contém o registro de correspondências de IPL de 2016.

val fil = CKfile.filter (line => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Vamos começar com o Açao parte onde mostramos a saída real:

  • collect ():

Coletar é a ação que usamos para exibir o conteúdo no RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / partidas.csv') CKfile.collect.foreach (println)

  • contagem():

Contagemé uma ação que usamos para contar o número de registros presente no RDD.Aquiestamos usando essa operação para contar o número total de registros em nosso arquivo match.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • levar():

Take é uma operação Action semelhante a collect, mas a única diferença é que pode imprimir qualquer número seletivo de linhas conforme solicitação do usuário. Aqui, aplicamos o seguinte código para imprimir o dez principais relatórios principais.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • primeiro():

First () é uma operação de ação semelhante a collect () e take ()istousado para imprimir o relatório superior é a saída Aqui, usamos a primeira operação () para encontrar o número máximo de partidas jogadas em uma determinada cidade e temos Mumbai como saída.

qual ide é melhor para java
val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val states = CKfile.map (_. split (',') (2)) val Scount = states.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Para tornar nosso processo de aprendizagem RDD usando Spark, ainda mais interessante, eu vim com um caso de uso interessante.

RDD usando Spark: caso de uso de Pokémon

  • Primeiramente, Vamos baixar um arquivo Pokemon.csv e carregá-lo no spark-shell como fizemos no arquivo Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokémons estão realmente disponíveis em uma grande variedade, vamos encontrar algumas variedades.

  • Removendo o esquema do arquivo Pokemon.csv

Podemos não precisar do Esquema do arquivo Pokemon.csv. Portanto, nós o removemos.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Encontrando o número de partições nosso pokemon.csv é distribuído em.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Pokémon de água

Encontrando o número de pokémons de água

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Pokémon de fogo

Encontrando o número de Pokémon de fogo

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Também podemos detectar o população de um tipo diferente de pokémon usando a função de contagem
WaterRDD.count () FireRDD.count ()

  • Já que gosto do jogo de estratégia defensiva vamos encontrar o Pokémon com defesa máxima.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Altitude_Defença:' + defenceList.max ())

  • Nós sabemos o máximo valor de força de defesa mas não sabemos qual pokémon é. então, vamos descobrir qual é aquele Pokémon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Ordenação [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Agora vamos resolver o pokémon com menos defesa
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Agora vamos ver o Pokémon com um estratégia menos defensiva.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2.filter (line =>! line.equals (NoHeadPoker2) valer defWith2 .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordenação [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

Então, com isso, chegamos ao fim deste RDD usando o artigo do Spark. Espero que tenhamos esclarecido o seu conhecimento sobre os RDDs, seus recursos e os vários tipos de operações que podem ser executados neles.

Este artigo é baseado em foi projetado para prepará-lo para o Exame de certificação de desenvolvedor Cloudera Hadoop e Spark (CCA175). Você obterá um conhecimento aprofundado sobre o Apache Spark e o ecossistema do Spark, que inclui Spark RDD, Spark SQL, Spark MLlib e Spark Streaming. Você obterá conhecimentos abrangentes sobre a linguagem de programação Scala, HDFS, Sqoop, Flume, Spark GraphX ​​e Sistema de mensagens como Kafka.