Data engineering has become one of the most sought-after roles in today's data-driven world.
Your DE interviews will test your coding and system design skills and handling big data tools and cloud infrastructure.
These are some of the most common data engineering interview questions. We compiled this list with real examples from candidates and input from data engineering interviewers.
Let's get started!
Data engineers must write efficient, clean code that manipulates data at scale.
Most interviewers will assess your foundations in data structures and algorithms. In short, can you develop performant solutions to complex problems?
You are given the head of a doubly linked list.
Using merge sort, write a function to sort the linked list in ascending or descending order.
Your program is running slowly because it's accessing data from disk over and over again. To improve the performance, you want to build a simple key-value store to cache this data in memory, but you also want to limit the amount of memory used. You decide to build a caching system that only keeps the N most recently used items—also known as a least recently used (LRU) cache. Write a class LRUCache(n) that accepts a size limit n. It should support a set(key, value) method for inserting or updating items and a get(key) method for retrieving items. Can you implement a solution where both of these methods run in O(1) time?
Let's say we have a long list of unsorted numbers (potentially millions), and we want to find the M largest numbers contained in it. Implement a function find_largest(input, m) that will find and return the largest m values given an input array or file. If the input array is empty, return None (Python) or null.
def find_largest(input_list, m):
# Check for edge cases
if not input_list or m <= 0:
return None
# Initialize list to store the largest m values
largest_values = []
for num in input_list:
if len(largest_values) < m:
# Add to the list if we haven't found m elements yet
largest_values.append(num)
else:
# Find the smallest element in largest_values
min_val = min(largest_values)
if num > min_val:
# Replace the smallest element if current num is larger
min_index = largest_values.index(min_val)
largest_values[min_index] = num
# Optional: Sort in descending order
return sorted(largest_values, reverse=True)
# Example usage:
input_list = [3, 1, 5, 6, 8, 2, 9, 10, 7]
m = 3
print(find_largest(input_list, m)) # Output should be [10, 9, 8]
Write a function sudokuSolve that checks whether a given sudoku board is solvable. If so, the function returns true. Otherwise (i.e. there is no valid solution to the given sudoku board), it returns false.
This code is a solution for solving a Sudoku puzzle.
def get_candidates(board, row, col):
candidates = []
for chr in '123456789':
collision = False
for i in range(9):
if (board[row][i] == chr or
board[i][col] == chr or
board[(row - row % 3) + i // 3][(col - col % 3) + i % 3] == chr):
collision = True
break
if not collision:
candidates.append(chr)
return candidates
def sudoku_solve(board):
row, col, candidates = -1, -1, None
for r in range(9):
for c in range(9):
if board[r][c] == '.':
new_candidates = get_candidates(board, r, c)
if candidates is None or len(new_candidates) < len(candidates):
candidates = new_candidates
row, col = r, c
if candidates is None:
return True
for val in candidates:
board[row][col] = val
if sudoku_solve(board):
return True
board[row][col] = '.'
return False
You are given an integer array coins representing different coin denominations and an integer amount representing a total amount of money. Write a function coinChange that returns the fewest number of coins needed to make up that amount. If that amount cannot be made up by any combination of the coins, return -1. You may assume that you have an infinite number of each kind of coin.
The following code solves the "coin change" problem using dynamic programming.
from typing import List
def coin_change(coins: List[int], amount: int) -> int:
# Initialize DP array with a value greater than the maximum possible number of coins needed
dp = [float('inf')] * (amount + 1)
dp[0] = 0 # Base case: 0 coins needed to make amount 0
# Process each amount from 1 to the given amount
for i in range(1, amount + 1):
for coin in coins:
if i - coin >= 0:
dp[i] = min(dp[i], dp[i - coin] + 1)
# If dp[amount] is still infinity, it means it's not possible to form the amount
return dp[amount] if dp[amount] != float('inf') else -1
Given the head of a linked list, write a function hasCycle to determine if the linked list has a cycle in it. A linked list is said to have a cycle if a node's next pointer points to a previous node in the list, forming a loop. Return true if there is a cycle, otherwise return false.
The solution below detects whether a linked list contains a cycle by employing the Floyd's Tortoise and Hare algorithm. This algorithm uses two pointers, slow and fast, which traverse the linked list at different speeds.
class ListNode:
def __init__(self, val=0, next=None):
self.val = val
self.next = next
def has_cycle(head: ListNode) -> bool:
slow = head
fast = head
while fast and fast.next:
slow = slow.next # Move slow pointer by 1 step
fast = fast.next.next # Move fast pointer by 2 steps
if slow == fast:
return True # A cycle is detected
return False # No cycle detected
Strong SQL skills are essential for any data engineering interview. You’ll be expected to write complex queries that efficiently extract and manipulate large volumes of data from relational databases.
Given the database with the schema shown below, write a SQL query to fetch the top earning employee by department, ordered by department name.
employees
Column | Data Type |
---|---|
id | int |
first_name | varchar |
last_name | varchar |
salary | int |
department_id | int |
projects
Column | Data Type |
---|---|
id | int |
title | varchar |
start_date | date |
end_date | date |
budget | int |
departments
Column | Data Type |
---|---|
id | int |
name | varchar |
employees_projects
Column | Data Type |
---|---|
project_id | int |
employee_id | int |
Query Result Format
department_name | employee_id | first_name | last_name | salary |
---|---|---|---|---|
varchar | int | varchar | varchar | int |
To fetch the top-earning employee by department, ordered by department name, you can use the following SQL query:
WITH ranked_employees AS (
SELECT
e.id AS employee_id,
e.first_name,
e.last_name,
e.salary,
d.name AS department_name,
ROW_NUMBER() OVER (PARTITION BY e.department_id ORDER BY e.salary DESC) AS rank
FROM
employees e
JOIN
departments d ON e.department_id = d.id
)
SELECT
department_name,
employee_id,
first_name,
last_name,
salary
FROM
ranked_employees
WHERE
rank = 1
ORDER BY
department_name;
Given a tweets table with tweet_id, user_id, msg, and tweet_date group the users by the number of tweets they posted in 2022 and count the number of users in each group.
with tweet_cte as(
SELECT user_id,COUNT(*) as tweet_bucket FROM tweets
WHERE EXTRACT(year from tweet_date)=2022
GROUP BY user_id)
SELECT tweet_bucket,COUNT(*) as users_num from tweet_cte
GROUP BY tweet_bucket
Given post and post_user tables, write an SQL query that shows the success rate of post (%) when the user's previous post had failed. The user table contains post_id, post_date, user_id, interface and is_successful_post. The post_user table contains user_id, user_type, and age. Your output should have the following columns: user_id and next_post_sc_rate (success rate of the post when the user’s previous post had failed). Order results by increasing next_post_sc_rate.
WITH post_seq AS (
SELECT
p.user_id,
p.post_id,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY post_date) AS post_seq_id,
is_successful_post
FROM post as p
)
, post_pairings AS (
SELECT
ps.user_id,
ps.post_seq_id AS fail_post_id,
ps.post_seq_id + 1 AS next_post_id
FROM post_seq AS ps
WHERE ps.is_successful_post = 0
)
SELECT
pp.user_id,
ROUND(SUM(p2.is_successful_post)*1.0/count(p2.is_successful_post),2) AS next_post_sc_rate
FROM post_pairings AS pp
JOIN post AS p2
ON pp.next_post_id = p2.post_id
GROUP BY 1
ORDER BY next_post_sc_rate ASC;
You work for a leading game development company where players can team up and compete. Each player's performance in different game sessions is recorded as distinct score entries in the database. You're provided a players table with player_id, player_name, and team_id columns and a scores table with score_id, player_id, and game_score. Write a SQL query to return the top 2 players from each team based on their single highest score across all sessions. If multiple players share the same highest score, include all of them, which may result in more than two top players for some teams.
WITH PlayerMaxScores AS (
SELECT
p.team_id,
p.player_name,
MAX(s.game_score) AS max_score
FROM
players p
JOIN
scores s ON p.player_id = s.player_id
GROUP BY
p.team_id, p.player_name
),
RankedPlayers AS (
SELECT
team_id,
player_name,
max_score,
DENSE_RANK() OVER (PARTITION BY team_id ORDER BY max_score DESC) AS rank
FROM
PlayerMaxScores
)
SELECT
team_id,
player_name,
max_score
FROM
RankedPlayers
WHERE
rank <= 2
ORDER BY
team_id, max_score DESC, player_name;
Data engineers frequently design and implement ETL (Extract, Transform, Load) processes to move data from various sources to target data stores. You'll need a solid grasp of data transformation, orchestration, and error handling.
How would you implement an incremental update mechanism in a daily ETL pipeline?
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('etl_job').getOrCreate()
# Load historical data
historical_df = spark.read.parquet("/path/to/historical_data")
# Load new records
new_data_df = spark.read.csv("/path/to/daily_data.csv", header=True)
# Assuming each record has a unique id field 'record_id',
# and 'update_time' field to track modifications.
# Define incremental load by filtering only new or updated records
latest_df = new_data_df.filter(new_data_df.update_time > historical_df.agg({"update_time": "max"}).first()[0])
# Write to target, either append or insert into partition
latest_df.write.mode("append").parquet("/path/to/historical_data")
How would you handle schema evolution in an ETL pipeline that extracts data from constantly changing APIs?
This PySpark code handles schema evolution when new data contains additional or missing columns.
# Example of schema evolution handling using PySpark
from pyspark.sql import functions as F
dataframe = spark.read.json("/path/to/new_data.json") # Adding default placeholders for missing columns
default_df = dataframe.withColumn("new_column", F.lit(None))
# You can leverage Spark's 'mergeSchema' option when writing to handle schema evolution automatically
default_df.write.option("mergeSchema", "true").parquet("/path/to/target_data")
Write a custom transformation function to clean data using Python that eliminates null or inconsistent records.
This function cleans a DataFrame by handling missing values and date parsing:
def clean_data(df):
df_cleaned = df.dropna(subset=["user_id", "purchase_date"])
df_cleaned["purchase_date"] = pd.to_datetime(
df_cleaned["purchase_date"], errors="coerce"
)
df_cleaned.dropna(subset=["purchase_date"], inplace=True)
return df_cleaned
clean_data(input_dataframe)
How would you implement a data deduplication mechanism in an ETL job that handles real-time streaming records?
The PySpark code below processes a streaming DataFrame and handles the deduplication of records using watermarks:
# Assuming Kafka stream produces records with a unique UUID identifier
deduplicated_stream = incoming_stream \
.withWatermark("event_timestamp", "10 minutes") \
.dropDuplicates(["record_id"])
deduplicated_stream.writeStream\
.format("parquet")\
.option("path", "/path/to/output")\
.start()
Explain an approach for efficient backfilling of missing data in a pipeline.
The various types of nulls in Spark are:
1. Filtering null values
2. Replacing null values
3. Dropping rows with null values
4. Coalesce
# Create a sample DataFrame with null values
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.appName("NullHandling").getOrCreate()
data = [(1, "Alice"), (2, None), (3, "Bob"), (None, "Eve")]
df = spark.createDataFrame(data, ["id", "name"])
# Filter rows where the 'name' column is NOT null
df_filtered = df.filter(col("name").isNotNull())
df_filtered.show()
# Replace null values in 'name' column with "Unknown"
df_replaced = df.fillna({"name": "Unknown", "id": -1})
df_replaced.show()
In the example below:
# Drop rows with any null values
df_dropped_any = df.dropna()
df_dropped_any.show()
# Drop rows if all values in the row are null
df_dropped_all = df.dropna(how="all")
df_dropped_all.show()
# Drop rows with less than 1 non-null value (thresh=1 means at least 1 non-null value must be present)
df_dropped_thresh = df.dropna(thresh=1)
df_dropped_thresh.show()
from pyspark.sql.functions import coalesce
# Create a sample DataFrame with multiple columns, some containing nulls
data = [(1, None, "Alice"), (2, "M", None), (3, None, "Bob")]
df_multi = spark.createDataFrame(data, ["id", "gender", "name"])
# Use coalesce to select the first non-null value in the specified columns
df_coalesced = df_multi.withColumn("final_name", coalesce("name", "gender", "id"))
df_coalesced.show()
Data engineering is about building systems that process massive volumes of data efficiently and reliably. System design interviews will test your ability to think holistically about how different components fit together in a scalable architecture.
Write a Spark job that reads a large Parquet file, performs aggregations, and writes it back as a Parquet file.
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AggregationJob").getOrCreate()
# Read Parquet File
input_df = spark.read.parquet("/s3/path/to/data")
# Perform Aggregation
aggregated_df = input_df.groupBy("user_id").agg({"sku_count": "sum"})
# Write back to Parquet
aggregated_df.write.mode("overwrite").parquet("/s3/path/to/output")
Write a Kafka consumer using Python to read messages of user activity and process them.
from kafka import KafkaConsumer
import json
# Create Kafka consumer
consumer = KafkaConsumer(
'user_activity',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='user_activity_group',
value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
user_activity = message.value
# Perform processing (e.g., store to database, analytics)
print(user_activity)
This Python code snippet uses the KafkaConsumer class from the kafka-python library to consume messages from a Kafka topic. Here’s a breakdown of the code:
How would you implement a Spark Streaming job that listens to Kafka events and writes to Cassandra?
This PySpark code snippet establishes a streaming data pipeline that reads events from a Kafka topic and writes them to a Cassandra database:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("KafkaToCassandra") \
.getOrCreate()
# Reading the data stream via Kafka
kafkaStream = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "events") \
.load()
# Transformation logic
from pyspark.sql.functions import from_json, col
kafkaStream = kafkaStream.withColumn("event_data", from_json(kafkaStream.value.cast("string")))
transformed_df = kafkaStream.select(
col("event_data.user_id"),
col("event_data.event_timestamp"),
col("event_data.event_type")
)
# Write to Cassandra using Spark-Cassandra Connector
transformed_df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.option("keyspace", "user_ks") \
.option("table", "user_events") \
.start()
Design a Flink job for processing sensor data in real-time and trigger alerts for anomalies.
The following Python code snippet uses Apache Flink to create a streaming application that detects anomalies in sensor data based on temperature readings.
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
# Source: Stream data from sensors
sensor_data_stream = env.add_source(your_source_function())
# Process: Identify temperature anomalies in sensor data
def detect_anomaly(sensor_data):
if sensor_data['temperature'] > threshold:
print(f"Anomaly detected in sensor: {sensor_data}")
return sensor_data
processed_data_stream = sensor_data_stream.filter(detect_anomaly)
# Sink: Trigger alerting system (or log)
processed_data_stream.print()
env.execute("Sensor Anomaly Detection")
Explain implementing a large-scale distributed join operation without OOM using Partitioning in Spark.
The code ensures that the join operation is optimized through repartitioning, which is crucial for handling large datasets in distributed data processing applications.
# Hash partitioning both tables on the same key
table1_partitioned = table1.repartition(100, "join_key")
table2_partitioned = table2.repartition(100, "join_key")
# Perform the join operation efficiently
joined_df = table1_partitioned.join(table2_partitioned, "join_key")
# Write the joined result
joined_df.write.parquet("/path_to_output")
Write a Spark job that demonstrates how to force Spark to use a broadcast join and a sort-merge join when joining two DataFrames.
from pyspark.sql import SparkSession
from pyspark.sql.functions import broadcast
spark = SparkSession.builder.appName("JoinStrategies").getOrCreate()
# Create sample DataFrames
df_large = spark.range(1000000).withColumnRenamed("id", "key")
df_small = spark.range(100).withColumnRenamed("id", "key")
# Broadcast join (forces a broadcast join for the smaller DataFrame)
df_broadcast_join = df_large.join(broadcast(df_small), on="key")
print("Broadcast Join Plan:")
df_broadcast_join.explain() # Look for 'BroadcastHashJoin' in the physical plan
# Sort-Merge join (forces a sort-merge join by disabling broadcast threshold)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
df_sort_merge_join = df_large.join(df_small, on="key")
print("Sort-Merge Join Plan:")
df_sort_merge_join.explain() # Look for 'SortMergeJoin' in the physical plan
Write a code example demonstrating how to use repartition and coalesce to modify the number of partitions for a DataFrame in Spark.
df = spark.range(100000)
# Use repartition to increase the number of partitions to 20 (full shuffle)
df_repartitioned = df.repartition(20)
print(f"Number of partitions after repartition: {df_repartitioned.rdd.getNumPartitions()}")
# Use coalesce to reduce the number of partitions to 5 (no shuffle)
df_coalesced = df_repartitioned.coalesce(5)
print(f"Number of partitions after coalesce: {df_coalesced.rdd.getNumPartitions()}")
Create a DataFrame and demonstrate how to write it using both bucketing and partitioning. Explain how each affects file storage.
data = [("Alice", "Math", 85), ("Bob", "English", 90), ("Alice", "Science", 95)]
df = spark.createDataFrame(data, ["name", "subject", "score"])
# Write with bucketing
df.write.bucketBy(5, "name").saveAsTable("bucketed_table")
# Write with partitioning
df.write.partitionBy("subject").mode("overwrite").parquet("/tmp/partitioned_table")
# Verify the directory structure
print("Bucketed Table Structure:")
spark.sql("SHOW PARTITIONS bucketed_table").show() # Bucketing doesn't create directory structure based on columns
print("Partitioned Table Directory Structure:")
spark.read.parquet("/tmp/partitioned_table").show() # Check directory structure by partitions
Implement a map-side join using a broadcast join in Spark to optimize joining a small lookup DataFrame with a large DataFrame.
# Large DataFrame
df_large = spark.range(1000000).withColumnRenamed("id", "user_id")
# Small lookup DataFrame
df_lookup = spark.createDataFrame([(1, "Gold"), (2, "Silver"), (3, "Bronze")], ["user_id", "membership"])
# Perform a map-side join using broadcast
df_joined = df_large.join(broadcast(df_lookup), "user_id", "left")
df_joined.show()
Write a Spark job to detect skewness in a DataFrame by calculating the distribution of a specific column. Then, handle skewness by applying repartitionByRange.
from pyspark.sql.functions import col
# Sample DataFrame with skewed data
data = [(1, "A"), (1, "B"), (1, "C"), (2, "D"), (3, "E")]
df_skewed = spark.createDataFrame(data, ["key", "value"])
# Calculate distribution to detect skewness
df_skewed.groupBy("key").count().orderBy(col("count").desc()).show()
# Repartition by range to manage skewness
df_balanced = df_skewed.repartitionByRange(3, "key")
print(f"Partitioning after repartitionByRange: {df_balanced.rdd.glom().map(len).collect()}")
Write a code example to handle skewed data by applying salting before joining two DataFrames.
from pyspark.sql.functions import expr
# Original skewed DataFrame
df1 = spark.createDataFrame([(1, "A"), (1, "B"), (2, "C")], ["key", "value1"])
df2 = spark.createDataFrame([(1, "D"), (1, "E"), (2, "F")], ["key", "value2"])
# Adding a salt column to distribute the skewed key (1)
df1_salted = df1.withColumn("salt", expr("floor(rand() * 3)")) # 3 is the salt range
df2_salted = df2.withColumn("salt", expr("floor(rand() * 3)"))
# Perform join on both key and salt to reduce skewness
df_joined = df1_salted.join(df2_salted, (df1_salted.key == df2_salted.key) & (df1_salted.salt == df2_salted.salt), "inner")
df_joined.show()
Write a Spark job to demonstrate how to use both static and dynamic partitioning while writing a DataFrame.
hive.exec.dynamic.partition.mode should be set to "nonstrict" to enable dynamic partitioning.
data = [("Alice", "2023-01", 85), ("Bob", "2023-02", 90), ("Alice", "2023-01", 95)]
df = spark.createDataFrame(data, ["name", "date", "score"])
# Static partitioning
df.write.mode("overwrite").partitionBy("date").parquet("/tmp/static_partitioned_table")
# Dynamic partitioning
spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
df.write.mode("overwrite").partitionBy("name", "date").parquet("/tmp/dynamic_partitioned_table")
For data engineering candidates, understanding Dimensional Modeling is crucial, especially in the context of data warehousing. Dimensional modeling is a data structure technique that enables efficient data storage, retrieval, and analysis, optimized specifically for data warehousing tools.
At the core of dimensional modeling are facts and dimensions. Facts represent the key measurements or metrics derived from business processes, such as sales or revenue figures. In contrast, dimensions provide context for these facts, offering descriptive information that allows data to be sliced and analyzed, such as customer details or time data.
Attributes define the characteristics of these dimensions, adding granularity to the model. Within this structure, a fact table serves as the central table, containing the main quantitative data, while dimension tables provide the surrounding context. Facts can be categorized into three types: additive, non-additive, and semi-additive. Mastering these concepts is essential for a data engineer, as they form the foundation of building robust, query-optimized data warehouses that support business intelligence and analytics.
We hope this gives you a good sense of what to expect in your data science interviews.
Best of luck with your upcoming interview!
Exponent is the fastest-growing tech interview prep platform. Get free interview guides, insider tips, and courses.
Create your free account