当前位置:网站首页>Develop the first Flink app

Develop the first Flink app

2022-07-19 09:25:00 51CTO

Welcome to visit mine GitHub

Here we classify and summarize all the original works of Xinchen ( Including supporting source code ): https://github.com/zq2599/blog_demos

Step list

  • The actual combat experience is as follows :
  1. Create an ;
  2. code ;
  3. structure ;
  4. Submit task to Flink, Verification function ;

environmental information

  1. Flink:1.7;
  2. Flink The operating system of the machine :CentOS Linux release 7.5.1804;
  3. development environment JDK:1.8.0_181;
  4. development environment Maven:3.5.0;

Introduction to application functions

  • stay  《Flink1.7 From installation to experience 》 In the article , We are Flink function SocketWindowWordCount.jar, The function realized is from socket Read string , Count the number of each word , Today we will code and develop this application , Realize this function ;

Create an

  • The basic application code is through mvn Command created , Enter the following command at the command line :
mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.0

     
  • 1.
  • Press the prompt on the console to enter groupId、artifactId、version、package Etc , After all the way enter and confirm , It will generate a file that matches your input artifactId A folder with the same name , It's a maven engineering :
Define value for property 'groupId': com.bolingcavalry
Define value for property 'artifactId': socketwordcountdemo
Define value for property 'version' 1.0-SNAPSHOT: :
Define value for property 'package' com.bolingcavalry: :
Confirm properties configuration:
groupId: com.bolingcavalry
artifactId: socketwordcountdemo
version: 1.0-SNAPSHOT
package: com.bolingcavalry

     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • use IEDA Import this maven engineering , Here's the picture , There are already two classes :BatchJob and StreamingJob,BatchJob Is for batch processing , This actual battle does not use , So you can delete , Only stream processed StreamingJob:
     Develop the first Flink application _flink

  • App created successfully , Now we can start coding ;

code

  • You can choose to directly from GitHub Download the source code of this project , The address and link information is shown in the following table :
name link remarks
Project home page  https://github.com/zq2599/blog_demos The project is in progress. GitHub Home page on
git Warehouse address (https) https://github.com/zq2599/blog_demos.git The warehouse address of the source code of the project ,https agreement
git Warehouse address (ssh) [email protected]:zq2599/blog_demos.git The warehouse address of the source code of the project ,ssh agreement
  • This git Multiple folders in project , The source code of this chapter is in socketwordcountdemo Under this folder , As shown in the red box below :
     Develop the first Flink application _flink_02

  • Next, start coding :

  • stay StreamingJob Add static inner classes to class WordWithCount, This is a PoJo, Used to save a specific word and its frequency :

	 /** *  Record words and their frequency Pojo */
	public static class WordWithCount {
		/** *  Word content  */
		public String word;

		/** *  Frequency of occurrence  */
		public long count;

		public WordWithCount() {
			super();
		}

		public WordWithCount(String word, long count) {
			this.word = word;
			this.count = count;
		}

		/** *  Show the word content and frequency  * @return */
		@Override
		public String toString() {
			return word + " : " + count;
		}
	}

     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • Write all business logic in StreamJob Class main In the method , As shown below , Chinese notes are added at key positions :
public static void main(String[] args) throws Exception {

		// environmental information 
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// The data source is local 9999 port , Newline separator , You can also consider hostname and port Parameters through main The input parameters of the method are passed in 
		DataStream<String> text = env.socketTextStream("localhost", 9999, "\n");

		// adopt text Object conversion gets new DataStream object ,
		// The transformation logic is to separate each string , Create one for all the words you get WordWithCount object 
		DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() {
			@Override
			public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
				for(String word : s.split("\\s")){
					collector.collect(new WordWithCount(word, 1L));
				}
			}
		})
		.keyBy("word")//key by word Field 
		.timeWindow(Time.seconds(5))	// Tumbling time window every five seconds 
		.reduce(new ReduceFunction<WordWithCount>() { //reduce Strategy 
			@Override
			public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception {
				return new WordWithCount(a.word, a.count+b.count);
			}
		});


		// Single thread output results 
		windowCounts.print().setParallelism(1);

		//  perform 
		env.execute("Flink Streaming Java API Skeleton");
	}

     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.

structure

  • stay pom.xml Execute the command in the directory where the file is located :
mvn clean package -U

     
  • 1.
  • After the command is executed , stay target In the catalog socketwordcountdemo-1.0-SNAPSHOT.jar Files are built successfully jar package ;

stay Flink verification

nc -l 9999

     
  • 1.
  • My side Flink Of the machine IP The address is 192.168.1.103, Therefore, access with browser Flink Of web The address is : http://192.168.1.103:8081;

  • Choose the jar File as a new task , Here's the picture :
     Develop the first Flink application _java_03

  • Click in the red box below "upload", Submit a document :
     Develop the first Flink application _flink_04

  • At present, only jar The file was uploaded , The next step is to manually set the execution class and start the task , Operation as shown below , Red box 2 Filled in the above StreamingJob Full name of the class :
     Develop the first Flink application _flink_05

  • The effect of the submitted page is shown in the following figure , See a job It's already running :
     Develop the first Flink application _flink_06

  • go back to Flink Console of the machine , Entered before nc -l 9999 Input some English sentences in the window of , Then press enter , for example :

[[email protected] flink-1.7.0]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host vostro.
Starting taskexecutor daemon on host vostro.
[[email protected] flink-1.7.0]# nc -l 9999
Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. Flink has been designed to run in all common cluster environments, perform computations at in-memory speed and at any scale.


     
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • Next, let's take a look at our job The implementation effect of , Here's the picture , Click on the left "Task Managers", There is only one in the list on the right Task, Click on it :
     Develop the first Flink application _flink_07

  • Three pages appear tab page , Click on "Stdout" This tab, You can see the statistical results of our task on the words in the previous sentence , Here's the picture :
     Develop the first Flink application _java_08

  • thus , The first is the simplest Flink It's done. .

Welcome to your attention 51CTO Blog : Xinchen, programmer

  On the way to study , You are not alone , Xinchen's original works are accompanied all the way …

原网站

版权声明
本文为[51CTO]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/200/202207171016287784.html