Transformação cumulativa com estado no Apache Spark Streaming



Esta postagem do blog discute transformações com estado no Spark Streaming. Saiba tudo sobre o rastreamento cumulativo e o aprimoramento de habilidades para uma carreira no Hadoop Spark.

Contribuição de Prithviraj Bose

conceitos básicos de introdução à programação sas

Em meu blog anterior, discuti as transformações com monitoração de estado usando o conceito de janelas do Apache Spark Streaming. Você pode ler isso Aqui .





Nesta postagem, vou discutir as operações com estado cumulativas no Apache Spark Streaming. Se você for novo no Spark Streaming, recomendo fortemente que leia meu blog anterior para entender como funciona o janelamento.

Tipos de transformação stateful no Spark Streaming (continuação ...)

> Acompanhamento cumulativo

Nós tínhamos usado o reduzByKeyAndWindow (...) API para rastrear os estados das chaves, no entanto, o janelamento apresenta limitações para certos casos de uso. E se quisermos acumular os estados das chaves em vez de limitá-los a uma janela de tempo? Nesse caso, precisaríamos usar updateStateByKey (…) FOGO.



Essa API foi introduzida no Spark 1.3.0 e se tornou muito popular. No entanto, essa API tem alguma sobrecarga de desempenho, seu desempenho se degrada à medida que o tamanho dos estados aumenta com o tempo. Eu escrevi um exemplo para mostrar o uso desta API. Você pode encontrar o código Aqui .

Spark 1.6.0 introduziu uma nova API mapWithState (…) que resolve as sobrecargas de desempenho impostas por updateStateByKey (…) . Neste blog, irei discutir essa API específica usando um programa de amostra que escrevi. Você pode encontrar o código Aqui .

Antes de mergulhar em uma explicação passo a passo do código, vamos poupar algumas palavras sobre o ponto de verificação. Para qualquer transformação com estado, o ponto de verificação é obrigatório. O ponto de verificação é um mecanismo para restaurar o estado das chaves caso o programa do driver falhe. Quando o driver é reiniciado, o estado das chaves é restaurado a partir dos arquivos de ponto de verificação. Os locais de checkpoint são geralmente HDFS ou Amazon S3 ou qualquer armazenamento confiável. Ao testar o código, também é possível armazenar no sistema de arquivos local.



No programa de amostra, ouvimos o fluxo de texto de soquete em host = localhost e porta = 9999. Ele transforma o fluxo de entrada em (palavras, número de ocorrências) e rastreia a contagem de palavras usando a API 1.6.0 mapWithState (…) . Além disso, as chaves sem atualizações são removidas usando StateSpec.timeout API. Estamos fazendo checkpoint no HDFS e a frequência de checkpoint é a cada 20 segundos.

Vamos primeiro criar uma sessão Spark Streaming,

Spark-streaming-session

Nós criamos um checkpointDir no HDFS e, em seguida, chame o método do objeto getOrCreate (…) . o getOrCreate API verifica o checkpointDir para ver se há algum estado anterior para restaurar, se houver, ele recria a sessão Spark Streaming e atualiza os estados das chaves dos dados armazenados nos arquivos antes de prosseguir com os novos dados. Caso contrário, ele cria uma nova sessão Spark Streaming.

o getOrCreate recebe o nome do diretório do ponto de verificação e uma função (que chamamos createFunc ) cuja assinatura deve ser () => StreamingContext .

para que mongodb é usado

Vamos examinar o código interno createFunc .

Linha # 2: criamos um contexto de streaming com nome de trabalho para “TestMapWithStateJob” e intervalo de lote = 5 segundos.

Linha # 5: defina o diretório do ponto de verificação.

Linha # 8: Defina a especificação de estado usando a classe org.apache.streaming.StateSpec objeto. Primeiro definimos a função que rastreará o estado e, em seguida, definimos o número de partições para os DStreams resultantes que serão gerados durante as transformações subsequentes. Por fim, definimos o tempo limite (para 30 segundos) em que, se qualquer atualização de uma chave não for recebida em 30 segundos, o estado da chave será removido.

Linha 12 #: configurar o fluxo de soquete, nivelar os dados de lote de entrada, criar um par de valor-chave, chamar mapWithState , defina o intervalo do ponto de verificação para 20s e finalmente imprima os resultados.

A estrutura do Spark chama th e createFunc para cada chave com o valor anterior e o estado atual. Calculamos a soma e atualizamos o estado com a soma cumulativa e, finalmente, retornamos a soma da chave.

saltstack vs fantoche vs chef

Fontes do Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Tem alguma questão para nós? Mencione isso na seção de comentários e entraremos em contato com você.

Postagens relacionadas:

Comece a usar Apache Spark e Scala

Transformações com estado com janelas no fluxo do Spark