Действительно ли sample_n является случайным образцом при использовании с sparklyr?

У меня 500 миллионов строк в искровом фрейме. Мне интересно использовать sample_n из dplyr потому что это позволит мне явно указать размер выборки, который я хочу. Если бы я использовал sparklyr::sdf_sample(), мне сначала нужно было бы вычислить sdf_nrow(), а затем создать указанную долю данных sample_size/nrow, а затем передать эту долю в sdf_sample. Это не очень важно, но sdf_nrow() может занять некоторое время.

Таким образом, было бы идеально использовать dplyr::sample_n() напрямую. Однако после некоторого тестирования это не выглядит так, как sample_n() является случайным. Фактически, результаты идентичны head() ! Было бы серьезной проблемой, если бы вместо выборочных строк случайным образом функция просто возвращала первые n строк.

Может ли кто-нибудь еще подтвердить это? Является ли sdf_sample() моим лучшим вариантом?

# install.packages("gapminder")

library(gapminder)
library(sparklyr)
library(purrr)

sc <- spark_connect(master = "yarn-client")

spark_data <- sdf_import(gapminder, sc, "gapminder")


> # Appears to be random
> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
 sample_mean
 <dbl>
1 58.83397


> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
 sample_mean
 <dbl>
1 60.31693


> spark_data %>% sdf_sample(fraction = 0.20, replace = FALSE) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
 sample_mean
 <dbl>
1 59.38692
> 
> 
> # Appears to be random
> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
 sample_mean
 <dbl>
1 60.48903


> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
 sample_mean
 <dbl>
1 59.44187


> spark_data %>% sample_frac(0.20) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
 sample_mean
 <dbl>
1 59.27986
> 
> 
> # Does not appear to be random
> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
 sample_mean
 <dbl>
1 57.78434


> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
 sample_mean
 <dbl>
1 57.78434


> spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp))
# Source: lazy query [?? x 1]
# Database: spark_connection
 sample_mean
 <dbl>
1 57.78434
> 
> 
> 
> # === Test sample_n() ===
> sample_mean <- list()
> 
> for(i in 1:20){
+ 
+ sample_mean[i] <- spark_data %>% sample_n(300) %>% summarise(sample_mean = mean(lifeExp)) %>% collect() %>% pull()
+ 
+ }
> 
> 
> sample_mean %>% flatten_dbl() %>% mean()
[1] 57.78434
> sample_mean %>% flatten_dbl() %>% sd()
[1] 0
> 
> 
> # === Test head() ===
> spark_data %>% 
+ head(300) %>% 
+ pull(lifeExp) %>% 
+ mean()
[1] 57.78434
</dbl></dbl></dbl></dbl></dbl></dbl></dbl></dbl></dbl>
1 ответ

Это не. Если вы проверите план выполнения (функция optimizedPlan как определено здесь), вы увидите, что это всего лишь предел:

spark_data %>% sample_n(300) %>% optimizedPlan()
<jobj[168]>
 org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
 GlobalLimit 300
+- LocalLimit 300
 +- InMemoryRelation [country#151, continent#152, year#153, lifeExp#154, pop#155, gdpPercap#156], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), 'gapminder'
 +- Scan ExistingRDD[country#151,continent#152,year#153,lifeExp#154,pop#155,gdpPercap#156] 
</jobj[168]>

Это подтверждается show_query:

spark_data %>% sample_n(300) %>% show_query()
<sql>
SELECT *
FROM (SELECT *
FROM 'gapminder' TABLESAMPLE (300 rows) ) 'hntcybtgns'
</sql>

и визуализированный план выполнения:

Наконец, если вы проверите источник Spark, вы увидите, что этот случай реализован с помощью простого LIMIT:

case ctx: SampleByRowsContext =>
 Limit(expression(ctx.expression), query)

Я считаю, что эта семантика была унаследована от Hive, где эквивалентный запрос занимает n первых строк из каждого разбиения на вход.

На практике получение образца точного размера является очень дорогостоящим, и вам следует избегать, если это строго необходимо (так же, как большие LIMITS).

licensed under cc by-sa 3.0 with attribution.