Baixe o app para aproveitar ainda mais
Prévia do material em texto
UNIVERSIDADE ESTÁCIO DE SÁ ESPECIALIZAÇÃO CIÊNCIA DE DADOS E BIG DATA ANALYTICS MAPREDUCE JAVA E HADOOP HENRIQUE MATEUS FRANZE Trabalho da disciplina NPG2060 Tutor: Prof. DENIS GONCALVES COPLE ARTHUR NOGUEIRA 2019 7 Objetivo: Criar um MapReduce Java e executar no Hadoop utilizando a sandbox Hortonworks HDP em máquina virtual. Proposta: Mapear todos os termos de um pequeno texto usando o termo primitivo “stemizado” - stem extraído com processamento natural de linguagem - PNL como chave. Em seguida reduzir verificando a frequência que cada stem aparece. Filtrando-se os stems mais frequentes temos um método simplório para extrair as palavras chaves de um determinado texto Para o processamento de linguagem natural foi utilizado a API Apache Lucene (https://lucene.apache.org/) que fornece as funcionalidades utilizadas. Como fonte de dados do exemplo apresentado nesse documento foi usado o texto da página 3 da apostila de tecnologias avançadas - introdução ao Hadoop: Figura 1 – Texto extraído da apostila usado como entrada. Termos Processamento Linguagem Natural usados: PLN – processamento de linguagem natural Normalização - transformação de letras maiúsculas para minúsculas, remoção de caracteres especiais, remoção de tags HTML/Javascript/CSS, dentre outras. Tokenização - O processo de tokenização tem como objetivo separar palavras ou sentenças em unidades: “Esta é uma sentença!” - [“esta”, “é”, “uma”, “sentença”] Remoção de stopwords - Esse método consiste em remover palavras muito frequentes, tais como “a”, “de”, “o”, “da”, “que”, “e”, “do” entre outras, pois na maioria das vezes não são informações relevantes para a construção do modelo Stemização - O processo de stemização (do inglês, stemming) consiste em reduzir uma palavra ao seu radical. A palavra “meninas” se reduziria a “menin”, assim como “meninos” e “menininhos”. As palavras “gato”, “gata”, “gatos” e “gatas” reduziriam-se para “gat”. Stem – o produto do processo de Stemização “gato”, “gata”, “gatos” e “gatas” produzem o stem “gat” Mapper: Nessa classe ocorre o tratamento da linguagem natural e o stem é extraído dos tokens da seguinte forma: 1. Normalização Removemos toda a pontuação; 2. Tokenização; 3. Aplica-se um filtro para transformar todas as palavras em minúsculas; 4. Remoção de stopwords; 5. Escrita do par key, value (stem, token) para o contexto; Código fonte StemMapper.java: package trabalhoTecAvancadas; import java.io.IOException; import java.io.StringReader; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.lucene.analysis.ASCIIFoldingFilter; import org.apache.lucene.analysis.LowerCaseFilter; import org.apache.lucene.analysis.StopFilter; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.br.BrazilianAnalyzer; import org.apache.lucene.analysis.br.BrazilianStemFilter; import org.apache.lucene.analysis.standard.ClassicTokenizer; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; import org.apache.lucene.util.Version; public class StemMapper extends Mapper<LongWritable, Text, Text, Text> { public static String stem(String term) throws IOException { TokenStream tokenStream = null; try { // tokenize tokenStream = new ClassicTokenizer(Version.LUCENE_36, new StringReader(term)); // stem tokenStream = new BrazilianStemFilter(tokenStream); // add each token in a set, so that duplicates are removed Set<String> stems = new HashSet<String>(); CharTermAttribute token = tokenStream.getAttribute(CharTermAttribute.class); tokenStream.reset(); while (tokenStream.incrementToken()) { stems.add(token.toString()); } // if no stem or 2+ stems have been found, return null if (stems.size() != 1) { return null; } String stem = stems.iterator().next(); // if the stem has non-alphanumerical chars, return null if (!stem.matches("[a-zA-Z0-9-]+")) { return null; } return stem; } finally { if (tokenStream != null) { tokenStream.close(); } } } @Override public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String input = value.toString(); //remove pontuacao input = input.replaceAll("[\\p{Punct}&&[^'-]]+", " "); // usa lucene para filtrar o conteúdo que não é chave. TokenStream tokenStream = null; // tokenize input tokenStream = new ClassicTokenizer(Version.LUCENE_36, new StringReader(input)); // to lowercase tokenStream = new LowerCaseFilter(Version.LUCENE_36, tokenStream); // convert any char to ASCII tokenStream = new ASCIIFoldingFilter(tokenStream); // remove portuguese stop words tokenStream = new StopFilter(Version.LUCENE_36, tokenStream, BrazilianAnalyzer.getDefaultStopSet()); CharTermAttribute token = tokenStream.getAttribute(CharTermAttribute.class); tokenStream.reset(); while (tokenStream.incrementToken()) { String term = token.toString(); // stemização de cada termo. String stem = stem(term); context.write(new Text(stem), new Text(term)); } if (tokenStream != null) { tokenStream.close(); } } } Reducer: Nesta classe é feito um totalizador contando a ocorrência de cada stem, e para esse exemplo escreve no contexto os stems que ocorrem mais de 4 vezes. Códifo fonte StemReducer.Java: package trabalhoTecAvancadas; import java.io.IOException; import java.io.StringReader; import java.util.HashSet; import java.util.Set; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.lucene.analysis.ASCIIFoldingFilter; import org.apache.lucene.analysis.LowerCaseFilter; import org.apache.lucene.analysis.StopFilter; import org.apache.lucene.analysis.TokenStream; import org.apache.lucene.analysis.br.BrazilianAnalyzer; import org.apache.lucene.analysis.br.BrazilianStemFilter; import org.apache.lucene.analysis.standard.ClassicTokenizer; import org.apache.lucene.analysis.tokenattributes.CharTermAttribute; import org.apache.lucene.util.Version; public class StemMapper extends Mapper<LongWritable, Text, Text, Text> { public static String stem(String term) throws IOException { TokenStream tokenStream = null; try { // tokenize tokenStream = new ClassicTokenizer(Version.LUCENE_36, new StringReader(term)); // stem tokenStream = new BrazilianStemFilter(tokenStream); // add each token in a set, so that duplicates are removed Set<String> stems = new HashSet<String>(); CharTermAttribute token = tokenStream.getAttribute(CharTermAttribute.class); tokenStream.reset(); while (tokenStream.incrementToken()) { stems.add(token.toString()); } // if no stem or 2+ stems have been found, return null if (stems.size() != 1) { return null; } String stem = stems.iterator().next(); // if the stem has non-alphanumerical chars, return null if (!stem.matches("[a-zA-Z0-9-]+")) { return null; } return stem; } finally { if (tokenStream != null) { tokenStream.close(); } } } @Override public void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String input = value.toString(); //remove pontuacao input = input.replaceAll("[\\p{Punct}&&[^'-]]+", " "); // usa lucene para filtrar o conteúdo que não é chave. TokenStream tokenStream = null; // tokenize input tokenStream = new ClassicTokenizer(Version.LUCENE_36, new StringReader(input)); // to lowercase tokenStream = new LowerCaseFilter(Version.LUCENE_36, tokenStream); // convert any char to ASCII tokenStream = new ASCIIFoldingFilter(tokenStream);// remove portuguese stop words tokenStream = new StopFilter(Version.LUCENE_36, tokenStream, BrazilianAnalyzer.getDefaultStopSet()); CharTermAttribute token = tokenStream.getAttribute(CharTermAttribute.class); tokenStream.reset(); while (tokenStream.incrementToken()) { String term = token.toString(); // stemização de cada termo. String stem = stem(term); context.write(new Text(stem), new Text(term)); } if (tokenStream != null) { tokenStream.close(); } } } Código fonte do configurador do job Stem.java: package trabalhoTecAvancadas; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class Stem extends Configured implements Tool{ public int run(String[] args) throws Exception { if (args.length != 3) { System.err.printf("Usage: %s [generic options] <input><output>\n", getClass().getSimpleName()); ToolRunner.printGenericCommandUsage(System.err); return -1; } Configuration conf = this.getConf(); Job job = new Job(conf); job.setJarByClass(Stem.class); job.setJobName("Stem Keyword"); FileInputFormat.addInputPath(job, new Path(args[1])); FileOutputFormat.setOutputPath(job, new Path(args[2])); job.setMapperClass(StemMapper.class); job.setReducerClass(StemReducer.class); //job.setCombinerClass(StemReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setSortComparatorClass(LongWritable.DecreasingComparator.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); return job.waitForCompletion(true) ? 0 : 1; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new Stem(), args); System.exit(exitCode); } } Dificuldades encontradas: Depuração da aplicação: Como é necessário exportar o o jar e executar somente no ambiente HADOOP como job, qualquer teste e depuração do código fica um processo lento e cansativo. Para contornar essa dificuldade foi criada uma classe de teste JUnit e hadoop.mrunit utilizando os drivers de teste, desta maneira foi possível debugar o código do Mapper e do Reducer visualizando os valores de variáveis nos pontos de interrupção e imprimindo saídas quando necessário. Código fonte tester.java package trabalhoTester; import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mrunit.mapreduce.MapDriver; import org.apache.hadoop.mrunit.mapreduce.MapReduceDriver; import org.apache.hadoop.mrunit.mapreduce.ReduceDriver; import org.junit.Before; import org.junit.Test; import junit.framework.TestCase; import trabalhoTecAvancadas.StemMapper; import trabalhoTecAvancadas.StemReducer; public class tester extends TestCase{ MapDriver<LongWritable, Text, Text, Text> mapDriver; ReduceDriver<Text, Text, Text, IntWritable> reducerdriver; MapReduceDriver<LongWritable, Text, Text, Text, Text, IntWritable> mapreducedriver; private ArrayWritable ocurr = new ArrayWritable(Text.class); @Before public void setUp() { StemMapper mapper = new StemMapper(); mapDriver = MapDriver.newMapDriver(new StemMapper()); StemReducer reducer = new StemReducer(); reducerdriver = ReduceDriver.newReduceDriver(new StemReducer()); mapreducedriver = MapReduceDriver.newMapReduceDriver(mapper, reducer); } @Test public void testSimple() throws Exception { mapreducedriver.withMapper(new StemMapper()); //mapreducedriver.withInput(new LongWritable(1), new Text("A armazenados armazenamento armazenado")); //mapreducedriver.withInput(new LongWritable(1), new Text("Zettabytes (16 bilhões de Gigabytes - GB). Mas a projeção para 2020 é de 44")); mapreducedriver.withInput(new LongWritable(1), new Text("A quantidade de dados armazenados no mundo está estimada em 16\r\n" + "Zettabytes (16 bilhões de Gigabytes - GB). Mas a projeção para 2020 é de 44\r\n" + "ZB. Somente o Facebook armazena 7 Petabytes (PB) por mês (IDC, 2016),\r\n" + "sendo que 1 PB equivale a 1000 TB. São textos, sons, imagens, vídeos, enfim,\r\n" + "tem-se o que se chama Big Data, que é definida em função de 3 V’s:\r\n" + "- Grande Volume de dados (GB; TB; PB; ZB);\r\n" + "- Variados formatos (texto, imagem, vídeo etc.);\r\n" + "- Velocidade exponencial de crescimento.\r\n" + "Para que se possa processar grandes massas de dados, opta-se pelo\r\n" + "paralelismo de dados e controle (PACHECO, 1997), o que evita\r\n" + "redimensionamento da capacidade de processamento (aumento de memória,\r\n" + "CPU’s mais rápidas, e assim por diante) a cada incremento na quantidade de\r\n" + "dados a processar. Um crescimento exponencial da quantidade a ser\r\n" + "processada, levaria a uma constante readequação do sistema de computação.\r\n" + "Visando processar dados de forma paralela, pode-se contar com um\r\n" + "agrupamento de computadores, ou cluster, que deve oferecer comunicação de\r\n" + "dados entre processos paralelos, balanceamento da carga e tolerância a falhas,\r\n" + "e com Hadoop®, um framework para processamento em ambiente de Big\r\n" + "Data. Desenvolvido pelo Yahoo!, atualmente é um projeto de código aberto\r\n" + "(open source) de alto nível mantido pela The Apache™ Software Foundation.\r\n" + "Um cluster Hadoop é uma plataforma de software que processa - de forma\r\n" + "eficiente – um grande volume de informação, utilizando um grupo (cluster) de\r\n" + "computadores (\"nós\" do cluster) trabalhando em paralelo, com os dados sendo\r\n" + "distribuídos pelos nós, de forma replicada. Assim, caso ocorra uma falha em um\r\n" + "nó, o dado replicado que está em outro nó é copiado para o nó onde a falha\r\n" + "ocorreu.")); mapreducedriver.withReducer(new StemReducer()); Writable[] itemsArray = new Writable[1]; itemsArray[0] = new Text("16"); ocurr.set(itemsArray); mapreducedriver.withOutput(new Text("clust"), new IntWritable(4)); mapreducedriver.withOutput(new Text("dad"), new IntWritable(9)); mapreducedriver.withOutput(new Text("no"), new IntWritable(4)); mapreducedriver.withOutput(new Text("par"), new IntWritable(4)); mapreducedriver.withOutput(new Text("paralel"), new IntWritable(4)); mapreducedriver.withOutput(new Text("process"), new IntWritable(8)); mapreducedriver.runTest(); } } Exportação do arquivo JAR incluindo as dependências do Apache Lucene: Ao exportar o arquivo JAR da maneira orientada pela apostila as dependências não eram exportadas sendo necessário então exportar um JAR Executável: Figura 2 - Exportação do JAR com dependências (Lucene) Configuração da saída do Mapper: Caso não se informe as classes de saída do Mapper na configuração o Job considera que as saídas do Mapper e do Reducer são iguais, então foi necessário ajustar: job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); Execução: Os arquivos foram copiados para a VM e pelo SSH o arquivo de entrada (apostila.txt) foi compactado e enviado para o HDFS: Figura 3 - Cópia dos arquivos O arquivo JAR foi executado pelo YARN sem erros: Figura 4 - Execução da aplicação com YARN E o arquivo part-r-00000 foi gerado com a saída esperada ordenada pela chave: Figura 5 – Resultado do MAPREDUCE Comparando com a saída da depuração do testador JUnit onde conseguimos imprimir a lista de termos que geraram cada um dos stems: Figura 6 – Resultado usandoa classe de teste JUnit Veja que essa sentença: “Processamento de dados paralelos para cluster em nós.” Seria reduzia aos mesmos stems extraídos, já que qualquer combinação entre os termos listados seria extraído da mesma maneira, o que demonstra a validade da técnica mesmo que aplicada de maneira tão simplória. Conclusão: O modelo proposto funcionou bem a título de exemplo, mas é uma apresentação incompleta de uma funcionalidade, o próprio Apache Lucene tem integrações com desenho mais adequado para fazer funcionalidades parecidas ou muito mais avançadas integradas com HADOOP. O Processo de Mapping e Reducing foi compreendido durante a execução desse projeto e algumas perguntas ficaram ainda sem resposta – o que já era esperado – já que haverá continuidade do aprendizado. Algumas das perguntas a serem respondidas em aprendizado posterior: · Como agrupar valores a partir da chave; · Como ordenar pelos valores (e não pela chave); · Como usar diversos Mappers em diversas fontes de dados distintas; Bibliografia: Stack Overflow – Acesso 22/10/2019 Java library for keywords extraction from input text https://stackoverflow.com/questions/17447045/java-library-for-keywords-extraction-from-input-text Medium – Acesso 28/10/2019 https://medium.com/botsbrasil/o-que-%C3%A9-o-processamento-de-linguagem-natural-49ece9371cff Apache Lucene – Acesso 22/10/2019 http://lucene.apache.org/ Tutoriais Cloudera – Acesso 20/10/2019 https://www.cloudera.com/tutorials.html Hadooptutorial – Acesso 25/10/2019 http://hadooptutorial.info/ Apache Hadoop – Acesso 20/10/2019 https://hadoop.apache.org/ Marven Repository – Acesso 20/10/2019 https://mvnrepository.com/ Apostila da Disciplina.
Compartilhar