Q 1. What is the Retention Period in Kafka Topic ?
Answer : Kafka events or Messages older than retention period will not be available for Consumer to Consume. Because it gets deleted. Kafka Retain the data only for retention period. Let;s say if Retention period is 7 days, Then You cannot get the data older than 7 days.
Q 2. One Job which reads data from kafka topic and then process it,is failing because it has offset to read which are older than retention period?
Answer : it should have property failOnDataLoss to be false , in that case it will not fail the job with error, But you would lose the data.
below is the description of property from Kafka Documentation
failOnDataLoss: Whether to fail the query when it’s possible that data is lost (e.g., topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn’t work as you expected. Batch queries will always fail if it fails to read any data from the provided offsets due to lost data.
Q 3. Which mechanism [Push Or Pull] is used by Kafka Consumer to read data from Brokers ?
Answer :It is Pulled from the broker by Consumer.Because It if it is Pushed by Brokers to the Consumers , Rate is Controlled by Broker and not by the Consumer. If Pulled , then each consumer can read from broker at the maximum rate of Consumer, Independent of Other consumers which might be Slower or Faster. That’s why Pull Mechanism is used.
Q 4. Is it Possible to Read data from kafka topic from fixed offset using command line ?
Answer : Yes it is possible , Depending on partitions of topic ,we will have different offset. To read date from fixed offset use below command.
Just change the m and n as per your requirement –partition m –offset n
kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic sampleTopic1 –property print.key=true –partition 0 –offset 12
Author’s Recommendations
Kafka Console Producer and Consumer
Apache Kafka Series – Learn Apache Kafka for Beginners
Q 5. There is a json file with following content :-
{“dept_id”:101,”e_id”:[10101,10102,10103]}
{“dept_id”:102,”e_id”:[10201,10202]}
And data is loaded into spark dataframe say mydf, having below dtypes
dept_id: bigint, e_id: array<bigint>
What will be the best way to get the e_id individually with dept_id ?
Answer :
we can use the explode function , which will explode as per the number of items in e_id .
The code would be like
mydf.withColum(“e_id”,explode($”e_id”)).
Here we have taken the new column same as old column, the dtypes of opdf will be
dept_id: bigint, e_id:bigint
So output would look like
+———+——-+
| dept_id| e_id |
+———+——-+
| 101 | 10101|
| 101 | 10102|
| 101 | 10103|
| 102 | 10201|
| 102 | 10202|
+———-+——+
Q 6 . How many number of column will be present in the df2, if df1 have three columns a1,a2,a3
Var df2=df.withColumn(“b1”,lit(“a1”)).withColumn(“a1”,lit(“a2”)).withColumn(“a2”,$“a2”).withColumn(“b2”,$”a3”)).withColumn(“a3”,lit(“b1”))
Answer :
Total 5 As below
Df // a1,a2,a3
df.withColumn(“b1”,lit(“a1”)) //a1,a2,a3,b1
.withColumn(“a1”,lit(“a2”)) //a1,a2,a3,b1
.withColumn(“a2”,$“a2”) //a1,a2,a3,b1
.withColumn(“b2”,$”a3”))//a1,a2,a3,b1,b2
.withColumn(“a3”,lit(“b1”))//a1,a2,a3,b1,b2
Q 7 . How to get RDD with its element indices.
Say myrdd = (a1,b1,c1,s2,s5)
Output should be
((a1,0),(b1,1),(c1,2),(s2,3),(s5,4))
Answer :
we can use zipWithIndex function
var myrdd_windx = myrdd.zipWithIndex()
For more Interview Questions visit here
For any coding help in Big Data ask to our expert here
Q 8 . What is the use of query.awaitTermination() In structured streaming?
Answer : In batch processing, we generally load the whole data and store the data at once. But in real time streaming, we get data in micro batches mostly based on trigger processing time, hence the streaming query should be running till the Termination by any failure or other task. Hence it should wait till termination and keep on processing the real time data.
Q 9 . Spark automatically monitors cache usage on each node and drops out old data partitions. What is the manual way of doing it ?
Answer : RDD.unpersist() method will delete the cached data.
Q 10 . in Spark SQL ,what would be the output of below :
SELECT true <=> NULL;
Answer : false
Q 11. IS it possible to load images in spark dataframe ?
Answer Yes using below command
spark.read.format(“image”).load(“path of image”)
It will have a fixed set of columns ,Out of which data is stored is binary format.
Q 12. In your spark application , there is dependency of one class which is present in a jar file (abcd.jar) , Somewhere in your cluster . You don’t have a fat jar for your application , How would you use it ?
Answer : while submitting a spark job using spark-submit , we can pass that abcd.jar with –jar , In that way it would be available everywhere ,and our main app can use it.
And if we want to use it in spark-shell ,then we can load it using
: load abcd.jar
Read More Interview Questions here
Q 13. There is one scala code written in a file myApp.scala ,is it possible to run the complete code in spark shell without manual copying of code ?
Answer : Yes it is possible to run without copying , we just need to put the file in a directory from where we have started our spark shell. and in the spark shell we need to use below command
: load myApp.scala
You can mention the complete path if file is present somewhere else . It is useful when we are testing our application code before making a jar.
Q 14. You have dataframe mydf which have three columns a1,a2,a3 , but it is required to have column a2 with the new name b2, how would you do it ?
Answer : There is one function in spark dataframe to rename the column . which is withColumnRenamed(“”) ,it takes two argument , the first is the name of existing column name and second one is the name of new column.
so the syntax would be :-
var newdf= mydf.withColumnRenamed(“a2″,”b2”)
Q 15. Suppose you have two dataframe df1 and df2 , both have below columns :-
df1 => id , name, mobno
df2 => id ,pincode, address, city
After joining both the dataframe on the basis of key i.e id , while selecting id,name,mobno,pincode, address, city, you are getting an error ambiguous column id. how would you resolve it ?
Answer: selection of id columns depends on the type of join which we are performing.
- if it is inner join both the ids of df1 and df2 will have same values so before selecting we can drop any one id like :
var joined_df= df1.join(df2,df1(“id”) === df2(“id”)).drop(df2(“id”))
OR
var joined_df= df1.join(df2,df1(“id”) === df2(“id”)).drop(df1(“id”))
- if it is left join then we can drop the id which will have null values
var joined_df= df1.join(df2,df1(“id”) === df2(“id”)).drop(df2(“id”))
- if it is right join then we can drop the id which will have null values
var joined_df= df1.join(df2,df1(“id”) === df2(“id”)).drop(df1(“id”))
- if it is full join then we can rename both the ids df1(“id”) and df2(“id”) and use it as per the need.
Q 16 . You have list of columns which you need to select from a dataframe. The list gets updated every time you run the application , but the base dataframe ( say bsdf ) remains same.how would you select only columns which are there in the given list for that instance of Run.
Answer : let’s say the list is mycols which have all the required columns , we can use below command
var newdf= bsdf.select(mycols:_*)
here newdf will have different schema in every new run depending on the mycols.
Q 17 . If you have one dataframe df1 and one list which have some qualified cities where you need to run the offers. but df1 have all the cities where your business is running,How would you get the records only for qualified cities ?
Answer : we can use filter function and if records have city present in the qualified list , it will be qualified else it will be dropped.
var qualified_records= df1.filter($”city”.isin(qualified_cities:_ *))
- If you want to test your skills on spark,Why don’t you take the quiz : Spark-Quiz
- Don’t forget to subscribe us.
- Keep Sharing Keep Learning
- Don’t miss the tutorial on Top Big data courses on Udemy you should Buy
Q 18 .There are 50 columns in one spark data frame say df.it is needed to cast all the columns into string. But to make the code more generic. It is not recommended to cast individual columns by writing column name.How would you achieve it in spark using scala?
Answer : As we have dataframe df.
Using columns function we can get all the columns of df.
Now using map function we can cast them dynamically and resulted list can be used in select.
Syntax will be :-
Var casted_list= df.columns.map(x => col(x).cast(“string”))
Var castedDf=df.select(casted_list:_*)
You can verify the schema of castedDf using
castedDf.printSchema
Q 19. Suppose you are running a spark job 3 to 4 times everyday. And it loads the data into hive table.what would be the best approach to distinguish the data on the basis of time when it is loaded.
Answer : We can create a hive table which is partitioned on say batchtime which is nothing but a column generated while inserting data into hive table.
We can us below command to get the current time which will act as batchtime in hive table
var batchtime=System.currentTimeMillis()
And data frame which is storing data to partitioned table can have column batchtime which will act as partition column
df.withColumn(“batchtime”,lit(batchtime))
Q 20. Assume you want to generate a unique id to each record of data frame,how would you achieve it.
Answer : we can use monotonically_increasing_id() in withColumn
Q 21 . How will you get a hdfs file into local directory.
Answer :
Using command
Hadoop fs – get hdfsdir local dir
Q 22 . How would you see the running application in yarn from the command line ?And how will you kill the application.
Answer :
Yarn application -list
Yarn application -kill appid
Q 23 . Say you have data of a website contains information of logged in user ,one user may have multiple fields. But the number of fields per user may vary based on his actions.In that case which component of hadoop you will use to store the data?
Answer : hbase , nosql db.
Q 24 .Say you have one hbase table. Is it possible to create a hive table on top of it. it should not be manual data movement activity ,Any changes in hbase table should replicate in hive table.without any changes or data movement ?
Answer : Yes we can achieve it by creating a hive table which can point hbase as data source. In that case ,if there is any change in the data of hbase ,It will be reflected in hive as well.
We need to use Hbase storage handler while creating a table.
Q 25 .Assume If data from external sources is getting populated in to hdfs in csv format on a daily basis,
How would you handle it efficiently so that it can be processed by other applications and also reduce the data storage
Answer : Using ORC or Parquet format in hive,
Deleting old hdfs data and
Create business partdate as a partition in hive.
Q 26. There are 5000000 Records in one hive table and you have loaded it i spark -shell for development purposes. What would be the best practice to write code.Would you be processing 5000000 records in each line of code?
Answer : In that case we can use limit function(say 1000 records ) ,cache it and then use it . And when the complete code is ready we can process all data.
Q 27. Assume that you want to load a file having timestamp values (yyyy-MM-dd HH:mm:ss) into Apache Pig. After loading into Pig, add one day into each timestamp value.How will you achieve this ?
Answer : For proper explanation please read
https://bigdataprogrammers.com/load-timestamp-values-file-pig/
Tip : Don’t miss the tutorial on Top Big data courses on Udemy you should Buy
Q 28 . Suppose you have a spark dataframe which contains millions of records. You need to perform multiple actions on it. How will you minimise the execution time?
Answer : You can use cache or persist. For eg say you have dataframe df and if you use df1=df.cache() ,then df1 will be stored in its storage. once it is stored in its storage, multiple actions can be performed. Only first action will take longer time than others because on the first action, it actually caches the data. You can check the storage size of df1 from spark application tracker.
You can pass different storage level in persist.
Different storage levels are :
MEMORY_ONLY
DISK_ONLY
MEMORY_AND_DISK
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER
Q 29. What happens to spark application which you have run using spark submit and then press ctrl+c .Consider two cases :
A. If deploy mode is client
B. if deploy mode is cluster.
Answer : When submitted in client mode the driver runs on the machine through which you have submitted the spark application. So when ctrl+C has pressed ,it kills the whole application because driver’s execution is killed. Unlike in cluster, mode driver runs on cluster’s Worker nodes.Hence even if you press ctrl+c ,it will keep on running.
Q 30 . How would you load the data of hive table into spark dataframe?
Answer: You can use spark.table(“table name”) to get the dataframe.
Note : If you want to practice it you can refer to this post.
Q 31 . How would you name the spark application to track it?
Answer : You can use –name “appName” parameter while submitting the application using spark-submit.
Q 32 . If your cluster have limited resources, and there are many applications which need to be run, how would you ensure that your spark application will take the fixed number of resource and hence does not impact execution of other applications?
Answer : While submitting the spark application pass these two parameters .
–num-executors 10
–conf spark.dynamicAllocation.enabled = false
Note: you can change the number of executors if you need.
Q 33 .How would you limit the number of records (say 1000) in spark dataframe.
Answer : You can use df.limit(1000) function to limit the number of rows.
Q 34 . Give an example to describe map and flatmap in RDD.
Answer : Let’s say below is the RDD of Array[String]
scala> val rdd = sc.parallelize(Seq(“java python scala”, “sql C C++ Kotlin “))
If you use flatMap . and inline function to split on the basis of space .Then there would be an Array of string in the output where count of output may or may not be the same.
scala> rdd.flatMap(x => x.split(” “)).collect() res3: Array[String] = Array(java, python, scala, sql, C, C++, Kotlin)
But if You use map and inline function to split on the basis of space, then for one element there would be an array ,resulting Array[Array[String]].and the count will be the same as the count of rdd.
scala> rdd.map(x => x.split(” “)).collect() res4: Array[Array[String]] = Array(Array(java, python, scala), Array(sql, C, C++, Kotlin))
Q 35 . How will you convert rdd to df?
Answer: using below function
var mydf=spark.createDataFrame(myrdd,schema)
You can refer this post
If you want to test your skills on spark,Why don’t you take the quiz : Spark-Quiz
Don’t forget to subscribe us. Keep Sharing Keep Learning Don’t miss the tutorial on Top Big data courses on Udemy you should Buy